iro/model/job.py
changeset 302 3f4bdea2abbf
parent 294 0e75bd39767d
child 312 42fd5075a5d1
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/model/job.py	Thu Sep 27 17:15:46 2012 +0200
@@ -0,0 +1,146 @@
+# Copyright (c) 2012 netzguerilla.net <iro@netzguerilla.net>
+# 
+# This file is part of Iro.
+# 
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to use,
+# copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
+# #Software, and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+# 
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+# 
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+from twisted.python import log, threadable
+
+from datetime import datetime
+from collections import MutableMapping
+
+import schema
+import offer
+from .dbdefer import dbdefer
+
+class ExJob:
+    ''' A ExJob object represents a message to multiple recipients over multiple offers to send.  
+
+    One single message to one recipient is handeld in :class:`iro.controller.task.Task`.
+    This class holds connections to all tasks.
+    This class is responsiple to update the status in database of one job and updating the bill.
+    '''
+    
+    synchronized = ["incStatus", "_status"]
+
+    def __init__(self, dbjob, recipients, message, offers):
+        """Constructor of ExJob.
+        
+        :param dbjob: primary key of the job element in database
+        :param list recipients: list of all recipients
+        :param `iro.model.message.Message` message: message to send
+        :param list offers: list of all possible offers to send message over
+        """
+
+        self.dbjob = dbjob       #Connection to database job element (id)
+        self.message = message
+        self.recipients = recipients
+        self.offers = offers
+        self.tasks={}
+        self.c = 0
+        self.status = "started"
+        log.msg("Job(%s) created."%(self.dbjob))
+
+    def addTask(self,task):
+        """adding a task to tasks dict - key is the recipient.
+        
+        :param `iro.controller.task.Task` task: a task
+        """
+        self.tasks[task.recipient] = task
+
+    def incStatus(self):
+        """increments the processed messages (function is threadsafe)."""
+        self.c += 1
+        return self.c
+
+    def _status(self, session, status):
+        """updates the status of the database object (function is threadsafe).
+
+        :param session: a valid database session
+        :param string status: new status
+        """
+        job = schema.Job.get(session, self.dbjob)
+        if self.status == "error":
+            return
+        elif self.status == "sended" and status != "error":
+            return
+        job.status = status
+        self.status = status
+        log.msg("Job(%s) status changed to: %s."%(self.dbjob, status))
+        session.commit()
+
+    @dbdefer
+    def setStatus(self, session, task, status):
+        """callback of one task.
+        
+        This function  updates the database object and the bill.
+        """
+        c = self.incStatus()
+        job = schema.Job.get(session, self.dbjob)
+        
+        if c == len(self.recipients):
+            self._status(session,"sended") 
+        elif job.status in ["started","init"]:
+            self._status(session,"sending")
+
+        if status.costs > 0:
+            o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ)
+            job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), count=status.count, exID=status.exID, date=datetime.today(), offer=o))
+        session.commit()
+        
+        log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route))
+
+    @dbdefer
+    def setError(self, session, task, err):
+        """errback for one task.
+
+        This function updates the database object.
+        """
+        self.incStatus()
+        if self.status != "error":
+            self._status(session,"error")
+        log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err)
+
+threadable.synchronize(ExJob)
+
+class ExJobs(dict, MutableMapping):
+    """ a dict to handle all jobs.
+    """
+    @dbdefer
+    def create(self, session, user, recipients, message, offers, info=None):
+        """creates on new Job.
+        
+        :param session: a valid session ( created by decorator :func:`iro.model.dbdefer.dbdefer`)
+        :param `iro.model.schema.User` user: a user object
+        :param list recipients: list of all recipients
+        :param `iro.model.message.Message` message: message to send
+        :param list offers: a list of offers ( list will be reduced to the allowed offers for the **user** -- using :func:`iro.model.offer.extendProvider`)
+        :param string info: a bill group name
+        :returns: the new job
+        """
+        user = session.merge(user)
+        job = schema.Job(info=info, status="started")
+        user.jobs.append(job)
+        session.commit()
+        
+        o = offer.extendProvider(user, message.typ, offers, session=session)
+        self[job.id] = ExJob(job.id, recipients, message, o)
+        return self[job.id]
+
+exJobs = ExJobs()
+"""the dict of all available jobs."""