diff -r eb04ac3a8327 -r 3f4bdea2abbf iro/controller/task.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/controller/task.py Thu Sep 27 17:15:46 2012 +0200 @@ -0,0 +1,108 @@ +# Copyright (c) 2012 netzguerilla.net +# +# This file is part of Iro. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the +# #Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +from functools import partial + +from twisted.python import log +from twisted.internet.defer import inlineCallbacks, returnValue, maybeDeferred, Deferred + +from ..error import NoRouteForTask, RejectRecipient + +from ..model.offer import offers +from ..model.job import exJobs + +from .pool import taskPool + +class Task: + '''A Task is one message to one recipient and is a part of one :class:`iro.model.job.ExJob`. + ''' + def __init__(self, recipient, job): + """ + :param recipient: a recipient + :param `iro.model.job.ExJob` job: connected job + """ + self.recipient = recipient + self.job = job + self.status = None + self.error = False + + def setStatus(self,status): + """callback, to set status of task""" + self.status = status + return status + + def setError(self, error): + """errback to set error of task""" + self.status = error + self.error = True + return error + + def start(self): + """Starting to send message to recipient. + + :return: a defer, that is fired, when message is sended successfully over any offer. + """ + self.d = Deferred() + self.d.addCallback(self.setStatus) + self.d.addCallback(partial(self.job.setStatus,self)) + self.d.addErrback(self.setError) + self.d.addErrback(partial(self.job.setError,self)) + taskPool.run(self._run) + return self.d + + def _run(self): + """sends the message to recipient, tries all possible offers.""" + os= iter(self.job.offers) + def n(): + try: + offer = os.next() + d = maybeDeferred(offers[offer],self.recipient,self.job.message) + d.addCallback(self.d.callback) + d.addErrback(addErr,offer) + d.addErrback(self.d.errback) + return d + except StopIteration: + self.d.errback(NoRouteForTask()) + + def addErr(failure, offer): + if not isinstance(failure.value, RejectRecipient): + log.err(_why="Job(%s): Send to '%s' failed via '%s'"%(self.job.dbjob, self.recipient, offer),_stuff=failure) + n() + + n() + + +@inlineCallbacks +def createJob(user,recipients, msg, offers, info=None): + """Creates a :class:`iro.model.job.ExJob` and start for all recipients one task. + + :param `iro.model.schema.User` user: the sender + :param `iro.model.message.Message` msg: the message + :param list offers: a list of possible offer and provider names, to try to send the message over. The first entry will be tried first. + :param string info: a bill group name + :return: the new :class:`iro.model.job.ExJob` object. + """ + job = yield exJobs.create(user, recipients, msg, offers, info) + for r in recipients: + task = Task(r,job) + job.addTask(task) + task.start() + returnValue(job)