diff -r eb04ac3a8327 -r 3f4bdea2abbf iro/model/job.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/model/job.py Thu Sep 27 17:15:46 2012 +0200 @@ -0,0 +1,146 @@ +# Copyright (c) 2012 netzguerilla.net +# +# This file is part of Iro. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the +# #Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +from twisted.python import log, threadable + +from datetime import datetime +from collections import MutableMapping + +import schema +import offer +from .dbdefer import dbdefer + +class ExJob: + ''' A ExJob object represents a message to multiple recipients over multiple offers to send. + + One single message to one recipient is handeld in :class:`iro.controller.task.Task`. + This class holds connections to all tasks. + This class is responsiple to update the status in database of one job and updating the bill. + ''' + + synchronized = ["incStatus", "_status"] + + def __init__(self, dbjob, recipients, message, offers): + """Constructor of ExJob. + + :param dbjob: primary key of the job element in database + :param list recipients: list of all recipients + :param `iro.model.message.Message` message: message to send + :param list offers: list of all possible offers to send message over + """ + + self.dbjob = dbjob #Connection to database job element (id) + self.message = message + self.recipients = recipients + self.offers = offers + self.tasks={} + self.c = 0 + self.status = "started" + log.msg("Job(%s) created."%(self.dbjob)) + + def addTask(self,task): + """adding a task to tasks dict - key is the recipient. + + :param `iro.controller.task.Task` task: a task + """ + self.tasks[task.recipient] = task + + def incStatus(self): + """increments the processed messages (function is threadsafe).""" + self.c += 1 + return self.c + + def _status(self, session, status): + """updates the status of the database object (function is threadsafe). + + :param session: a valid database session + :param string status: new status + """ + job = schema.Job.get(session, self.dbjob) + if self.status == "error": + return + elif self.status == "sended" and status != "error": + return + job.status = status + self.status = status + log.msg("Job(%s) status changed to: %s."%(self.dbjob, status)) + session.commit() + + @dbdefer + def setStatus(self, session, task, status): + """callback of one task. + + This function updates the database object and the bill. + """ + c = self.incStatus() + job = schema.Job.get(session, self.dbjob) + + if c == len(self.recipients): + self._status(session,"sended") + elif job.status in ["started","init"]: + self._status(session,"sending") + + if status.costs > 0: + o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ) + job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), count=status.count, exID=status.exID, date=datetime.today(), offer=o)) + session.commit() + + log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route)) + + @dbdefer + def setError(self, session, task, err): + """errback for one task. + + This function updates the database object. + """ + self.incStatus() + if self.status != "error": + self._status(session,"error") + log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err) + +threadable.synchronize(ExJob) + +class ExJobs(dict, MutableMapping): + """ a dict to handle all jobs. + """ + @dbdefer + def create(self, session, user, recipients, message, offers, info=None): + """creates on new Job. + + :param session: a valid session ( created by decorator :func:`iro.model.dbdefer.dbdefer`) + :param `iro.model.schema.User` user: a user object + :param list recipients: list of all recipients + :param `iro.model.message.Message` message: message to send + :param list offers: a list of offers ( list will be reduced to the allowed offers for the **user** -- using :func:`iro.model.offer.extendProvider`) + :param string info: a bill group name + :returns: the new job + """ + user = session.merge(user) + job = schema.Job(info=info, status="started") + user.jobs.append(job) + session.commit() + + o = offer.extendProvider(user, message.typ, offers, session=session) + self[job.id] = ExJob(job.id, recipients, message, o) + return self[job.id] + +exJobs = ExJobs() +"""the dict of all available jobs."""