iro/model/job.py
author Sandro Knauß <knauss@netzguerilla.net>
Thu, 29 Mar 2012 16:27:40 +0200
branchdevel
changeset 258 0a5eb5aac0be
parent 245 4526747a42ca
child 263 52284710c0b4
permissions -rw-r--r--
iro.model: adding docstring

from twisted.python import log, threadable

from datetime import datetime
from collections import MutableMapping

import schema
import offer
from .dbdefer import dbdefer

class ExJob:
    ''' A ExJob object represents a message to multiple recipients over multiple offers to send.  

    One single message to one recipient is handeld in :class:`iro.controller.task.Task`.
    This class holds connections to all tasks.
    This class is responsiple to update the status in database of one job and updating the bill.
    '''
    
    synchronized = ["incStatus", "_status"]

    def __init__(self, dbjob, recipients, message, offers):
        """Constructor of ExJob.
        
        :param dbjob: primary key of the job element in database
        :param list recipients: list of all recipients
        :param `iro.model.message.Message` message: message to send
        :param list offers: list of all possible offers to send message over
        """

        self.dbjob = dbjob       #Connection to database job element (id)
        self.message = message
        self.recipients = recipients
        self.offers = offers
        self.tasks={}
        self.c = 0
        self.status = "started"
        log.msg("Job(%s) created."%(self.dbjob))

    def addTask(self,task):
        """adding a task to tasks dict - key is the recipient.
        
        :param `iro.controller.task.Task` task: a task
        """
        self.tasks[task.recipient] = task

    def incStatus(self):
        """increments the processed messages (function is threadsafe)."""
        self.c += 1
        return self.c

    def _status(self, session, status):
        """updates the status of the database object (function is threadsafe).

        :param session: a valid database session
        :param string status: new status
        """
        job = schema.Job.get(session, self.dbjob)
        if self.status == "error":
            return
        elif self.status == "sended" and status != "error":
            return
        job.status = status
        self.status = status
        log.msg("Job(%s) status changed to: %s."%(self.dbjob, status))
        session.commit()

    @dbdefer
    def setStatus(self, session, task, status):
        """callback of one task.
        
        This function  updates the database object and the bill.
        """
        c = self.incStatus()
        job = schema.Job.get(session, self.dbjob)
        
        if c == len(self.recipients):
            self._status(session,"sended") 
        elif job.status in ["started","init"]:
            self._status(session,"sending")

        if status.costs > 0:
            o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ)
            job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), count=status.count, exID=status.exID, date=datetime.today(), offer=o))
        session.commit()
        
        log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route))

    @dbdefer
    def setError(self, session, task, err):
        """errback for one task.

        This function updates the database object.
        """
        self.incStatus()
        if self.status != "error":
            self._status(session,"error")
        log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err)

threadable.synchronize(ExJob)

class ExJobs(dict, MutableMapping):
    """ a dict to handle all jobs.
    """
    @dbdefer
    def create(self, session, user, recipients, message, offers, info=None):
        """creates on new Job.
        
        :param session: a valid session ( created by decorator :func:`iro.model.dbdefer.dbdefer`)
        :param `iro.model.schema.User` user: a user object
        :param list recipients: list of all recipients
        :param `iro.model.message.Message` message: message to send
        :param list offers: a list of offers ( list will be reduced to the allowed offers for the **user** -- using :func:`iro.model.offer.extendProvider`)
        :returns: the new job
        """
        user = session.merge(user)
        job = schema.Job(info=info, status="started")
        user.jobs.append(job)
        session.commit()
        
        o = offer.extendProvider(user, message.typ, offers, session=session)
        self[job.id] = ExJob(job.id, recipients, message, o)
        return self[job.id]

exJobs = ExJobs()
"""the dict of all available jobs."""