moving DummyObserver and DummyPool -> iro.test_helpers.utils
DBTestCase now using DummyObserver out of the box
from functools import partial
from twisted.internet.defer import inlineCallbacks, returnValue, maybeDeferred, Deferred
from ..error import NoRouteForTask, RejectRecipient
from ..model.offer import offers
from ..model.job import exJobs
from .pool import taskPool
class Task:
'''one single part of a job.
a task is one message to one recipient
'''
def __init__(self, recipient, job):
self.recipient = recipient
self.job = job
self.status = None
self.error = False
def setStatus(self,status):
self.status = status
return status
def setError(self, error):
self.status = error
self.error = True
return error
def start(self):
self.d = Deferred()
self.d.addCallback(self.setStatus)
self.d.addCallback(partial(self.job.setStatus,self))
self.d.addErrback(self.setError)
self.d.addErrback(partial(self.job.setError,self))
taskPool.run(self._run)
return self.d
def _run(self):
os= iter(self.job.offers)
def n():
try:
offer = os.next()
d = maybeDeferred(offers[offer],self.recipient,self.job.message)
d.addCallback(self.d.callback)
d.addErrback(addErr)
d.addErrback(self.d.errback)
return d
except StopIteration:
self.d.errback(NoRouteForTask())
def addErr(failure):
failure.trap(RejectRecipient)
return n()
n()
@inlineCallbacks
def createJob(user,recipients, msg, offers, info=None):
job = yield exJobs.create(user, recipients, msg, offers, info)
for r in recipients:
task = Task(r,job)
job.addTask(task)
task.start()
returnValue(job)