durusmail: quixote-users: Re: How do you handle long running requests?
How do you handle long running requests?
2003-07-18
2003-07-18
Re: How do you handle long running requests?
2003-07-18
2003-07-18
2003-07-18
2003-07-18
2003-07-19
2003-07-18
2003-07-19
Graham Fawcett (5 parts)
2003-07-20
2003-07-21
2003-07-21
2003-07-18
Re: How do you handle long running requests?
Graham Fawcett
2003-07-19
Graham Fawcett wrote:
> Actually, it would be very interesting, and probably quite simple, to
> write a Quixote-based tuple space engine: a bare-bones write/take
> interface with a pluggable backend (in-memory, ZODB, Berkeley, ...).
> Would anyone be interested in such a thing?

I couldn't resist.

For your amusement, attached is a trivial tuple space implementation, built
using Quixote and Medusa.

The space isn't persistent; there's no security; and it uses eval() and repr()
for data marshalling, so it's both dangerous and limited. Fortunately it's also
extremely short, so you could easily add features/restrictions as you see fit.

(It's also not threadsafe; but as long as you're running on Medusa, you're
single-threaded anyway, no problem. If you run it elsewhere, just ensure that
take() and write() in space.py are handled atomically.)

It's not a tuple space, formally speaking, in the sense that any old Python
tuple will not be a valid entry. I based it on the notion that your average
tuple is going to have a "type" and a set of attributes. So a tuple in my
context is a (tagname, dict) tuple.

Of course you can change this to any behaviour you want. (JavaSpaces, for
example, uses object pickles instead of tuples, and searches for them via an
object-query-language.) You might want to write Python pickles, or XML
documents, etc. Like I said earlier, the code is short. ;-)

Querying for a tuple is done via the take(tagname, **querydict) method.

     take('foo') will return any one tuple of tagname == 'foo'. If there are no
     foo's in the system, None is returned (it doesn't block).

     take('foo', bar=None) will return any one 'foo' tuple that has a 'bar' key
     in its dictionary. The value of bar doesn't matter.

     take('foo', bar='baz') will return any one 'foo' tuple
     such that tupledict['bar'] == 'baz'.

When you take a tuple, it's deleted from the tuple space. Some tuple space
implementations have read() methods that are non-destructive, but you should
try to stick to take() and write() until you've got the feel.


Here's one way you could use it, for example, if you had a need to
distribute the execution of long-running queries among processes/machines.

First, start your tuplespace server via server.py. It opens an HTTP socket,
defaulting to port 8088.

Generally speaking, your web app will write tuples with tagname=='Query' into
the tuple space:

     from client import TupleConnection

     conn = TupleConnection('localhost')
     sql = 'select * from bigtable'
     qid = get_unique_id()                                      # see below...
     conn.write('Query', sql=sql, id=qid)


Note the unique ID. This could be a (host, pid, sessionid) tuple or a big int,
etc. It could also be a hash or digest of your query (or even the query string
itself); as long as it uniquely identifies this query string, and not another,
then you should be fine.

You'd store this query ID somewhere in your Web app for later retrieval,
perhaps in the user session.

Meanwhile you have other processes running (on the same machine, or on others),
that take Query tuples and process them, returning Result tuples:

     conn = TupleConnection('localhost')
     while 1:
         tup = conn.take('Query') # take any Query at all
         if tup is None:
             # no query found
             time.sleep(5)
             continue
         sql = tup['sql']
         result_set = database_conn.execute(sql).fetchall()
         # convert result set to list of tuples, or XML, or whatever,
         # as long as it's marshallable
         conn.write(
             'Result',
             id=tup['id], # important
             result=result_set)


Then, back in your Web app, when the user asks for the results of the query:

     qid = fetch_query_id_from_wherever()
     conn = TupleConnection('localhost') # or conn = pool.get_connection()
     result = conn.take('Result', id=qid)
     if result is None:
         return 'Still waiting for result'
     else:
         return result['result']


That's it.

One last note: I forget who observed that a single-value tuple is semantically
equivalent to a semaphore. My point is, tuple spaces are about co-ordination,
not just communication, and locking/blocking based on takable tuples or their
absence can be an effective way to coordinate your efforts.

There's no reason you couldn't use a single tuple space to support a wide range
of unrelated applications, as long as they can all trust one another.

This was fun (and easy) to put together. I hope you find it interesting.

-- Graham

# webui.py

import space

SPACE = space.Space()

_q_exports = ['take', 'write', 'dump']

def write(req):
    data = req.stdin.read()
    tag, tupledict = eval(data)
    SPACE.write(tag, **tupledict)
    return 'OK'

def take(req):
    querytag, querydict = eval(req.stdin.read())
    m = SPACE.take(querytag, **querydict)
    return repr(m)

def dump(req):
    # returns a copy of the entire tuple space dict.
    # not really a tuple space verb, but you can use it for testing.
    return repr(SPACE.store)
# client.py

import httplib

class TupleConnection:
    """
    A connection to a tuple space.
    """

    def __init__(self, host, port=8088):
        self.conn = httplib.HTTPConnection(host, port)

    def __del__(self):
        self.conn.close()

    def _send(self, method, command, body=None):
        headers = {}
        if body is not None:
            headers['Content-Type'] = 'application/octet-stream'
        self.conn.request(method, command, body=body, headers=headers)
        r = self.conn.getresponse()
        assert r.status == 200, str((r.status, r.reason))
        data = r.read()
        return data

    def write(self, tag, **tupledict):
        """
        Write a tuple to the space.
        """
        data = self._send('post', '/write', repr( (tag, tupledict) ))
        assert data == 'OK', data

    def take(self, querytag, **querydict):
        """
        Fetch a tuple from the space that matches the query, or None.
        See space module for query details.
        """
        data = self._send('post', '/take', repr( (querytag, querydict) ))
        return eval(data)

    def dump(self):
        """
        Get a dump of the tuplespace's contents. Use this for
        debugging, not for production.
        """
        data = self._send('get', '/dump')
        return eval(data)


if __name__ == '__main__':

    s = TupleConnection('localhost', 8088)

    # in case you have some greetings in the tuple space
    # from an earlier test, let's consume them.
    while s.take('greeting') is not None:
        continue

    # put some 'greeting' tuples in the space
    s.write('greeting', name='Fred')
    s.write('greeting', name='Jim')
    s.write('greeting', name='Mary', age=15)

    # take a greeting where name == Fred
    g = s.take('greeting', name='Fred')
    assert g['name'] == 'Fred'

    # try to get another Fred greeting (there isn't one)
    g = s.take('greeting', name='Fred')
    assert g is None

    # take any greeting, must have an 'age' attribute
    g = s.take('greeting', age=None)
    assert g['name'] == 'Mary' and g['age'] == 15

    # take any greeting, regardless of attributes
    g = s.take('greeting')
    assert g['name'] == 'Jim'

    # take any greeting, but there aren't any left
    g = s.take('greeting')
    assert g is None

    print 'done'
# server.py

from quixote.publish import SessionPublisher
from quixote.server.medusa_http import QuixoteHandler, http_server
import asyncore

PORT = 8088
APP_ROOT = 'webui'
SERVER_NAME = 'Quixote/Tuples'

print 'Now serving application %s on port %d' % (APP_ROOT, PORT)
server = http_server.http_server('', PORT)

publisher = SessionPublisher(APP_ROOT)
# When initializing the Publisher in your own driver script,
# you'll want to parse a configuration file.
##publisher.read_config("/full/path/to/demo.conf")
publisher.setup_logs()

qh = QuixoteHandler(publisher, SERVER_NAME, server)
server.install_handler(qh)
asyncore.loop()

# space.py

class Space:
    """
    A very simple tuple space.
    """

    _mapping_class = dict  # or PersistentMapping, OOBTree, ...
    _list_class = list # or PersistentList, ...

    def __init__(self):
        self.store = self._mapping_class()

    def write(self, tag, **tdict):
        try:
            tuples_of_this_tag = self.store[tag]
        except KeyError:
            tuples_of_this_tag = self.store[tag] = self._list_class()
        tuples_of_this_tag.append(tdict)

    def take(self, tag, **qdict):
        m = self._get_match(tag, qdict)
        if m is not None:
            # 'take' means we remove it from the store.
            tuples_of_this_tag = self.store[tag]
            tuples_of_this_tag.remove(m)
            if len(tuples_of_this_tag) == 0:
                # no more in the list? delete it.
                del self.store[tag]
        return m


    def _get_match(self, tag, qdict):
        """
        Search the space for a matching tuple, return None if no match.

        Tuples must match on tag name, but also by dict keys and values.
        A matching tuple must have a dict key for each dict key in the
        query. If qdict[key] is not None, then match[key] must
        equal qdict[key].
        """
        try:
            matches = self.store[tag]
        except KeyError:
            return None

        qkeys = qdict.keys()
        for element in matches:
            try:
                ekeys = element.keys()
                if _is_subset(qkeys, ekeys):
                    # has all the keys we are looking for
                    # so the tuple is worth looking at
                    for qkey, qval in qdict.items():
                        # if the query value is None, then it matches
                        # any value, otherwise test for equality
                        if qval is not None:
                            if not element[qkey] == qval:
                                raise 'fail'
                    # got a success
                    return element
            except 'fail':
                continue
        return None

def _is_subset(sublist, superlist):
    for el in sublist:
        if not el in superlist:
            return 0
    return 1


def test():
    # our space is empty to begin with.
    s = Space()

    # put some 'greeting' tuples in the space
    s.write('greeting', name='Fred')
    s.write('greeting', name='Jim')
    s.write('greeting', name='Mary', age=15)

    # take a greeting where name==Fred
    g = s.take('greeting', name='Fred')
    assert g['name'] == 'Fred'

    # try to get another Fred greeting (there isn't one)
    g = s.take('greeting', name='Fred')
    assert g is None

    # take any greeting, must have an 'age' attribute
    g = s.take('greeting', age=None)
    assert g['name'] == 'Mary' and g['age'] == 15

    # take any greeting, regardless of attributes
    g = s.take('greeting')
    assert g['name'] == 'Jim'

    # take any greeting, but there aren't any left
    g = s.take('greeting')
    assert g is None

    # there are no tuples left in our store!
    assert len(s.store) == 0

    print 'done'

if __name__ == '__main__':
    test()
reply