#!/usr/bin/python import threading import time import md5 class ThreadTask: """ Just a convenient way of grouping related info for a Deferrer """ def __init__(self, id, func, args=None, kwargs=None): self.id = id self.func = func self.args = args self.kwargs = kwargs self.result = None self.ready = False self.exception = None self.startTime = time.time() self.completionTime = None def __getAge(self): if self.completionTime: return time.time() - self.completionTime else: return None age = property(__getAge) def __getElapsedTime(self): if self.completionTime: return self.completionTime - self.startTime else: return None elapsedTime = property(__getElapsedTime) def __getStaleness(self): if self.completionTime: return self.age/self.elapsedTime else: return None staleness = property(__getStaleness) class Deferrer(object): maxResultAge = 30 maxStaleness = 3 def __init__(self): self.__bin = {} self.__ID = 0 self.__usedIDs = {} def defer(self, func, args=(), kwargs={}): # Do some housekeeping self.removeCompletedTasks() id = self.__addTask(func, args, kwargs) t = self.__setupThread(id) t.start() return id def taskReady(self, id): """ Returns a task's ready status """ return self.__bin[id].ready def pop(self, id): """ If it's done, return the result (or raise it's exception). if it's not, raise an exception""" if not self.taskReady(id): raise RuntimeError("Task not complete yet") task = self.__bin[id] if task.exception: raise task.exception result = task.result del self.__bin[id] # Do some housekeeping self.removeCompletedTasks() return result def has_key(self, id): return self.__bin.has_key(id) def taskCount(self): return len(self.__bin) def removeCompletedTasks(self): """ Removes tasks that have been complete for more than 'maxResultAge' seconds, and are staler than 'maxStaleness'. Staleness is a ratio computed by: [age of result]/[time to compute result] Eg. result is 90 seconds old, and it took 30 seconds to compute result: staleness = 3""" i = 0 for task in [task for task in self.__bin.values() if task.ready]: if task.staleness > self.maxStaleness and task.age > self.maxResultAge: del self.__bin[task.id] i += 1 return i def __addTask(self, func, args, kwargs): id = self.__nextID() task = ThreadTask(id, func, args, kwargs) self.__bin[id] = task return id def __nextID(self): self.__ID += 1 idString = None while idString is None or self.__usedIDs.has_key(idString): idString = md5.new("%s%s" % (self.__ID, time.time())).hexdigest() self.__usedIDs[idString] = idString return idString def __setupThread(self, id): t = threading.Thread(target=self.__start, args=(id,)) return t def __start(self, id): """ We're in a new thread now... Feel different? """ # get the task task = self.__bin[id] # run the task, and store the result try: task.result = task.func(*task.args, **task.kwargs) except Exception, e: # Whoops.... task.exception = e # Mark the completion time self.completionTime = time.time() # mark the task as ready (done) task.ready = True # put the task back in the bin. self.__bin[id] = task ############################################################################ # Module level functions to make it more convenient to have a single # deferrer for a process. ############################################################################ __globalDeferrer = Deferrer() def defer(func, args=(), kwargs={}): return __globalDeferrer.defer(func, args, kwargs) def taskReady(id): """ Returns a task's ready status """ return __globalDeferrer.taskReady(id) def pop(id): """ If it's done, return the result (or raise it's exception). if it's not, raise an exception""" return __globalDeferrer.pop(id) def has_key(id): return __globalDeferrer.has_key(id) def taskCount(): return __globalDeferrer.taskCount() def removeCompletedTasks(olderThan=30, stalerThan=3): return __globalDeferrer.removeCompletedTasks(olderThan, stalerThan) class QxDeferrer(Deferrer): """ This subclass takes the deferrer, and manages rudimentary browser interaction (redirect loop until the task is complete)""" refreshInterval = 2 def getID(self, request): id = request.form.get("taskID") return id def getPage(self, func, request): id = self.getID(request) if id and self.has_key(id) and Deferrer.taskReady(self, id): request.response.expire_cookie("qx_test_1", path="/") request.response.expire_cookie("qx_test_2", path="/") result = Deferrer.pop(self, id) result = self.crackResultAndCopyHeaders(result, request) return result else: if not (id and self.has_key(id)): id = Deferrer.defer(self, func, (request,True)) request.response.set_cookie("qx_test_1", "1", path="/") request.response.set_cookie("qx_test_2", "2", path="/") page = self.getProcessingPage(request, id) return page def crackResultAndCopyHeaders(self, result, request): """ Some results will be a tuple (result, original_request) This may be done by the caller, for example, if it is known that a redirect will be issued. If you need to use this feature, the key is this: In the function that is being 'deferred', when you return the result, also return the [original] request. Ex. Instead of: "return request.redirect('/URL?msg=Done')" do this: "return request.redirect('/URL?msg=Done'), request" By doing that, this function has the opportunity to copy the relevant headers from the request the function was working on to the [now] current request that the browser is issueing in it's redirect loop. This way, the redirect issued by the originally called function will work (otherwise, they'd just get the screen that has the info to the effect of "You should have been redirected, but it didn't work. Click here.) """ if type(result) != type((0,1)): # If Result is not a tuple, just return it # (no fanciness needed) return result else: # If result is a tuple, crack it, and then copy the # response.headers from the original request to the # current request (like Location: directives...) result, o_req = result # Copy the headers request.response.headers.update(o_req.response.headers) # Copy the status and reason request.response.status_code = o_req.response.status_code request.response.reason_phrase = o_req.response.reason_phrase return result def processing(self, request): if self.getID(request): return True def getProcessingPage(self, request, id): """ The main reason to subclass this class would probably be to override this function, to provide a nicer page that fits your app's look and feel""" counter = int(request.form.get("counter","0")) counter += 1 url = request.get_url() + "?taskID=%s&counter=%s" % (id, counter) dots = ". " * counter refreshInterval = self.refreshInterval request.response.set_header("refresh", "%(refreshInterval)s;url=%(url)s" % vars()) return """\ Processing...
Your request is being processed%(dots)s

This page should be refreshed every %(refreshInterval)s seconds and will display your results when they have been completed. If this page doesn't refresh automatically, please click here to force manual refreshes.
\n""" % vars() #+ request.dump_html() ''' An example of one way to use this class in Quixote follows: Given a BigTimeConsumingJob that simply does it's thing when called, and returns the result (with no built-in prompting for confirmation), conversion to use the QxDeferrer is as simple as adding the 'processTask' paramenter to the function call, with a default of False, and the 'if' test as the first step of the function (any logic between the function's def and the if will be executed twice, once by the 'main' thread, and once by the spawned thread, unless you take additional measures.) Lines with the ### are inserted/modified to accomodate the deferrer. Everything else remains just as it was before deciding to use the deferrer. def btcj [html] (self, request ,processTask=False ### ): if not processTask: ### return theDeferrer.getPage(self.btcj, request) ### starttime = time.time() time.sleep(10) res = time.time() - starttime header(request, "BTCJ") """\
That took %(res)0.2f seconds
""" % vars() footer(request) A more elaborate example follows: (this demonstrates the ability of the deferrer to correctly handle redirects.) def btcj_2 [html] (self, request , processTask=False ### ): if theDeferrer.processing(request): ### # If the deferrer is handling the job, ### # bypass all the logic below, ### # and just check the status. ### return theDeferrer.getPage(self.btcj_2, request) ### confirm = request.form.get('confirm', None) if confirm is None: doitLink = href(request.get_url() + quixote.html.html_quote("?confirm=1"), "Do it") cancelLink = href(request.get_url() + quixote.html.html_quote("?confirm=0"), "Cancel") header(request, "BTCJ-2") """\
Perform Big Time Consuming Job? This could take a while.
Are you sure ?
%(doitLink)s %(cancelLink)s
""" % vars() footer(request) else: if confirm == '0': msg = "BTCJ Canceled" elif confirm == '1': if not processTask: ### return theDeferrer.getPage( ### self.btcj_2, request) ### starttime = time.time() time.sleep(10) res = time.time() - starttime msg = "That took %(res)0.2f seconds" % vars() return self.goto_index(msg), request ''' _QxDeferrer = QxDeferrer() if __name__ == "__main__": def fib(x): assert x == int(x) and x >= 0, "Positive integers only, please" if x < 2: return x else: return fib(x-1) + fib(x-2) def getFib(x): r = fib(x) return "fib(%s) = %s" % (x,r) print "defering call..." myID = defer(getFib, (23,)) i = 0 while not taskReady(myID): print "not yet... %s" % (getFib(i)) i += 1 result = pop(myID) print "result of defered call is now in:", result