iro/model/job.py
author Sandro Knauß <knauss@netzguerilla.net>
Fri, 24 Aug 2012 01:05:06 +0200
branchdevel
changeset 294 0e75bd39767d
parent 263 52284710c0b4
child 312 42fd5075a5d1
permissions -rw-r--r--
adding LICENSE to all files

# Copyright (c) 2012 netzguerilla.net <iro@netzguerilla.net>
# 
# This file is part of Iro.
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
# #Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

from twisted.python import log, threadable

from datetime import datetime
from collections import MutableMapping

import schema
import offer
from .dbdefer import dbdefer

class ExJob:
    ''' A ExJob object represents a message to multiple recipients over multiple offers to send.  

    One single message to one recipient is handeld in :class:`iro.controller.task.Task`.
    This class holds connections to all tasks.
    This class is responsiple to update the status in database of one job and updating the bill.
    '''
    
    synchronized = ["incStatus", "_status"]

    def __init__(self, dbjob, recipients, message, offers):
        """Constructor of ExJob.
        
        :param dbjob: primary key of the job element in database
        :param list recipients: list of all recipients
        :param `iro.model.message.Message` message: message to send
        :param list offers: list of all possible offers to send message over
        """

        self.dbjob = dbjob       #Connection to database job element (id)
        self.message = message
        self.recipients = recipients
        self.offers = offers
        self.tasks={}
        self.c = 0
        self.status = "started"
        log.msg("Job(%s) created."%(self.dbjob))

    def addTask(self,task):
        """adding a task to tasks dict - key is the recipient.
        
        :param `iro.controller.task.Task` task: a task
        """
        self.tasks[task.recipient] = task

    def incStatus(self):
        """increments the processed messages (function is threadsafe)."""
        self.c += 1
        return self.c

    def _status(self, session, status):
        """updates the status of the database object (function is threadsafe).

        :param session: a valid database session
        :param string status: new status
        """
        job = schema.Job.get(session, self.dbjob)
        if self.status == "error":
            return
        elif self.status == "sended" and status != "error":
            return
        job.status = status
        self.status = status
        log.msg("Job(%s) status changed to: %s."%(self.dbjob, status))
        session.commit()

    @dbdefer
    def setStatus(self, session, task, status):
        """callback of one task.
        
        This function  updates the database object and the bill.
        """
        c = self.incStatus()
        job = schema.Job.get(session, self.dbjob)
        
        if c == len(self.recipients):
            self._status(session,"sended") 
        elif job.status in ["started","init"]:
            self._status(session,"sending")

        if status.costs > 0:
            o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ)
            job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), count=status.count, exID=status.exID, date=datetime.today(), offer=o))
        session.commit()
        
        log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route))

    @dbdefer
    def setError(self, session, task, err):
        """errback for one task.

        This function updates the database object.
        """
        self.incStatus()
        if self.status != "error":
            self._status(session,"error")
        log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err)

threadable.synchronize(ExJob)

class ExJobs(dict, MutableMapping):
    """ a dict to handle all jobs.
    """
    @dbdefer
    def create(self, session, user, recipients, message, offers, info=None):
        """creates on new Job.
        
        :param session: a valid session ( created by decorator :func:`iro.model.dbdefer.dbdefer`)
        :param `iro.model.schema.User` user: a user object
        :param list recipients: list of all recipients
        :param `iro.model.message.Message` message: message to send
        :param list offers: a list of offers ( list will be reduced to the allowed offers for the **user** -- using :func:`iro.model.offer.extendProvider`)
        :param string info: a bill group name
        :returns: the new job
        """
        user = session.merge(user)
        job = schema.Job(info=info, status="started")
        user.jobs.append(job)
        session.commit()
        
        o = offer.extendProvider(user, message.typ, offers, session=session)
        self[job.id] = ExJob(job.id, recipients, message, o)
        return self[job.id]

exJobs = ExJobs()
"""the dict of all available jobs."""