from functools import partial
from twisted.python import log
from twisted.internet.defer import inlineCallbacks, returnValue, maybeDeferred, Deferred
from ..error import NoRouteForTask, RejectRecipient
from ..model.offer import offers
from ..model.job import exJobs
from .pool import taskPool
class Task:
'''A Task is one message to one recipient and is a part of one :class:`iro.model.job.ExJob`.
'''
def __init__(self, recipient, job):
"""
:param recipient: a recipient
:param `iro.model.job.ExJob` job: connected job
"""
self.recipient = recipient
self.job = job
self.status = None
self.error = False
def setStatus(self,status):
"""callback, to set status of task"""
self.status = status
return status
def setError(self, error):
"""errback to set error of task"""
self.status = error
self.error = True
return error
def start(self):
"""Starting to send message to recipient.
:return: a defer, that is fired, when message is sended successfully over any offer.
"""
self.d = Deferred()
self.d.addCallback(self.setStatus)
self.d.addCallback(partial(self.job.setStatus,self))
self.d.addErrback(self.setError)
self.d.addErrback(partial(self.job.setError,self))
taskPool.run(self._run)
return self.d
def _run(self):
"""sends the message to recipient, tries all possible offers."""
os= iter(self.job.offers)
def n():
try:
offer = os.next()
d = maybeDeferred(offers[offer],self.recipient,self.job.message)
d.addCallback(self.d.callback)
d.addErrback(addErr,offer)
d.addErrback(self.d.errback)
return d
except StopIteration:
self.d.errback(NoRouteForTask())
def addErr(failure, offer):
if not isinstance(failure.value, RejectRecipient):
log.err(_why="Job(%s): Send to '%s' failed via '%s'"%(self.job.dbjob, self.recipient, offer),_stuff=failure)
n()
n()
@inlineCallbacks
def createJob(user,recipients, msg, offers, info=None):
"""Creates a :class:`iro.model.job.ExJob` and start for all recipients one task.
:param `iro.model.schema.User` user: the sender
:param `iro.model.message.Message` msg: the message
:param list offers: a list of possible offer and provider names, to try to send the message over. The first entry will be tried first.
:param string info: a bill group name
:return: the new :class:`iro.model.job.ExJob` object.
"""
job = yield exJobs.create(user, recipients, msg, offers, info)
for r in recipients:
task = Task(r,job)
job.addTask(task)
task.start()
returnValue(job)