#!/usr/bin/python import threading import time import md5 class ThreadTask: """ Just a convenient way of grouping related info for a Deferrer """ def __init__(self, id, func, args=None, kwargs=None): self.id = id self.func = func self.args = args self.kwargs = kwargs self.result = None self.ready = False self.exception = None self.startTime = time.time() self.completionTime = None def __getAge(self): if self.completionTime: return time.time() - self.completionTime else: return None age = property(__getAge) def __getElapsedTime(self): if self.completionTime: return self.completionTime - self.startTime else: return None elapsedTime = property(__getElapsedTime) def __getStaleness(self): if self.completionTime: return self.age/self.elapsedTime else: return None staleness = property(__getStaleness) class Deferrer(object): maxResultAge = 30 maxStaleness = 3 def __init__(self): self.__bin = {} self.__ID = 0 self.__usedIDs = {} def defer(self, func, args=(), kwargs={}): # Do some housekeeping self.removeCompletedTasks() id = self.__addTask(func, args, kwargs) t = self.__setupThread(id) t.start() return id def taskReady(self, id): """ Returns a task's ready status """ return self.__bin[id].ready def pop(self, id): """ If it's done, return the result (or raise it's exception). if it's not, raise an exception""" if not self.taskReady(id): raise RuntimeError("Task not complete yet") task = self.__bin[id] if task.exception: raise task.exception result = task.result del self.__bin[id] # Do some housekeeping self.removeCompletedTasks() return result def has_key(self, id): return self.__bin.has_key(id) def taskCount(self): return len(self.__bin) def removeCompletedTasks(self): """ Removes tasks that have been complete for more than 'maxResultAge' seconds, and are staler than 'maxStaleness'. Staleness is a ratio computed by: [age of result]/[time to compute result] Eg. result is 90 seconds old, and it took 30 seconds to compute result: staleness = 3""" i = 0 for task in [task for task in self.__bin.values() if task.ready]: if task.staleness > self.maxStaleness and task.age > self.maxResultAge: del self.__bin[task.id] i += 1 return i def __addTask(self, func, args, kwargs): id = self.__nextID() task = ThreadTask(id, func, args, kwargs) self.__bin[id] = task return id def __nextID(self): self.__ID += 1 idString = None while idString is None or self.__usedIDs.has_key(idString): idString = md5.new("%s%s" % (self.__ID, time.time())).hexdigest() self.__usedIDs[idString] = idString return idString def __setupThread(self, id): t = threading.Thread(target=self.__start, args=(id,)) return t def __start(self, id): """ We're in a new thread now... Feel different? """ # get the task task = self.__bin[id] # run the task, and store the result try: task.result = task.func(*task.args, **task.kwargs) except Exception, e: # Whoops.... task.exception = e # Mark the completion time self.completionTime = time.time() # mark the task as ready (done) task.ready = True # put the task back in the bin. self.__bin[id] = task ############################################################################ # Module level functions to make it more convenient to have a single # deferrer for a process. ############################################################################ __globalDeferrer = Deferrer() def defer(func, args=(), kwargs={}): return __globalDeferrer.defer(func, args, kwargs) def taskReady(id): """ Returns a task's ready status """ return __globalDeferrer.taskReady(id) def pop(id): """ If it's done, return the result (or raise it's exception). if it's not, raise an exception""" return __globalDeferrer.pop(id) def has_key(id): return __globalDeferrer.has_key(id) def taskCount(): return __globalDeferrer.taskCount() def removeCompletedTasks(olderThan=30, stalerThan=3): return __globalDeferrer.removeCompletedTasks(olderThan, stalerThan) class QxDeferrer(Deferrer): """ This subclass takes the deferrer, and manages rudimentary browser interaction (redirect loop until the task is complete)""" refreshInterval = 2 def getID(self, request): id = request.form.get("taskID") return id def getPage(self, func, request): id = self.getID(request) if id and self.has_key(id) and Deferrer.taskReady(self, id): request.response.expire_cookie("qx_test_1", path="/") request.response.expire_cookie("qx_test_2", path="/") result = Deferrer.pop(self, id) result = self.crackResultAndCopyHeaders(result, request) return result else: if not (id and self.has_key(id)): id = Deferrer.defer(self, func, (request,True)) request.response.set_cookie("qx_test_1", "1", path="/") request.response.set_cookie("qx_test_2", "2", path="/") page = self.getProcessingPage(request, id) return page def crackResultAndCopyHeaders(self, result, request): """ Some results will be a tuple (result, original_request) This may be done by the caller, for example, if it is known that a redirect will be issued. If you need to use this feature, the key is this: In the function that is being 'deferred', when you return the result, also return the [original] request. Ex. Instead of: "return request.redirect('/URL?msg=Done')" do this: "return request.redirect('/URL?msg=Done'), request" By doing that, this function has the opportunity to copy the relevant headers from the request the function was working on to the [now] current request that the browser is issueing in it's redirect loop. This way, the redirect issued by the originally called function will work (otherwise, they'd just get the screen that has the info to the effect of "You should have been redirected, but it didn't work. Click here.) """ if type(result) != type((0,1)): # If Result is not a tuple, just return it # (no fanciness needed) return result else: # If result is a tuple, crack it, and then copy the # response.headers from the original request to the # current request (like Location: directives...) result, o_req = result # Copy the headers request.response.headers.update(o_req.response.headers) # Copy the status and reason request.response.status_code = o_req.response.status_code request.response.reason_phrase = o_req.response.reason_phrase return result def processing(self, request): if self.getID(request): return True def getProcessingPage(self, request, id): """ The main reason to subclass this class would probably be to override this function, to provide a nicer page that fits your app's look and feel""" counter = int(request.form.get("counter","0")) counter += 1 url = request.get_url() + "?taskID=%s&counter=%s" % (id, counter) dots = ". " * counter refreshInterval = self.refreshInterval request.response.set_header("refresh", "%(refreshInterval)s;url=%(url)s" % vars()) return """\
Your request is being processed%(dots)s
This page should be refreshed every %(refreshInterval)s seconds and will display your results when they have been completed. If this page doesn't refresh automatically, please click here to force manual refreshes. |
Are you sure ? | |
%(doitLink)s | %(cancelLink)s |