iro/model/job.py
branchdevel
changeset 258 0a5eb5aac0be
parent 245 4526747a42ca
child 263 52284710c0b4
equal deleted inserted replaced
257:31114e40178d 258:0a5eb5aac0be
     6 import schema
     6 import schema
     7 import offer
     7 import offer
     8 from .dbdefer import dbdefer
     8 from .dbdefer import dbdefer
     9 
     9 
    10 class ExJob:
    10 class ExJob:
    11     '''One Job is a class that handles one job. One Job has multiple tasks.'''
    11     ''' A ExJob object represents a message to multiple recipients over multiple offers to send.  
       
    12 
       
    13     One single message to one recipient is handeld in :class:`iro.controller.task.Task`.
       
    14     This class holds connections to all tasks.
       
    15     This class is responsiple to update the status in database of one job and updating the bill.
       
    16     '''
    12     
    17     
    13     synchronized = ["incStatus", "_status"]
    18     synchronized = ["incStatus", "_status"]
    14 
    19 
    15     def __init__(self, dbjob, recipients, message, offers):
    20     def __init__(self, dbjob, recipients, message, offers):
    16         self.dbjob = dbjob       #Connection to mysql job element (id)
    21         """Constructor of ExJob.
       
    22         
       
    23         :param dbjob: primary key of the job element in database
       
    24         :param list recipients: list of all recipients
       
    25         :param `iro.model.message.Message` message: message to send
       
    26         :param list offers: list of all possible offers to send message over
       
    27         """
       
    28 
       
    29         self.dbjob = dbjob       #Connection to database job element (id)
    17         self.message = message
    30         self.message = message
    18         self.recipients = recipients
    31         self.recipients = recipients
    19         self.offers = offers
    32         self.offers = offers
    20         self.tasks={}
    33         self.tasks={}
    21         self.c = 0
    34         self.c = 0
    22         self.status = "started"
    35         self.status = "started"
    23         log.msg("Job(%s) created."%(self.dbjob))
    36         log.msg("Job(%s) created."%(self.dbjob))
    24 
    37 
    25     def addTask(self,task):
    38     def addTask(self,task):
       
    39         """adding a task to tasks dict - key is the recipient.
       
    40         
       
    41         :param `iro.controller.task.Task` task: a task
       
    42         """
    26         self.tasks[task.recipient] = task
    43         self.tasks[task.recipient] = task
    27 
    44 
    28     def incStatus(self):
    45     def incStatus(self):
       
    46         """increments the processed messages (function is threadsafe)."""
    29         self.c += 1
    47         self.c += 1
    30         return self.c
    48         return self.c
    31 
    49 
    32     def _status(self, session, status):
    50     def _status(self, session, status):
       
    51         """updates the status of the database object (function is threadsafe).
       
    52 
       
    53         :param session: a valid database session
       
    54         :param string status: new status
       
    55         """
    33         job = schema.Job.get(session, self.dbjob)
    56         job = schema.Job.get(session, self.dbjob)
    34         if self.status == "error":
    57         if self.status == "error":
    35             return
    58             return
    36         elif self.status == "sended" and status != "error":
    59         elif self.status == "sended" and status != "error":
    37             return
    60             return
    40         log.msg("Job(%s) status changed to: %s."%(self.dbjob, status))
    63         log.msg("Job(%s) status changed to: %s."%(self.dbjob, status))
    41         session.commit()
    64         session.commit()
    42 
    65 
    43     @dbdefer
    66     @dbdefer
    44     def setStatus(self, session, task, status):
    67     def setStatus(self, session, task, status):
       
    68         """callback of one task.
       
    69         
       
    70         This function  updates the database object and the bill.
       
    71         """
    45         c = self.incStatus()
    72         c = self.incStatus()
    46         job = schema.Job.get(session, self.dbjob)
    73         job = schema.Job.get(session, self.dbjob)
    47         
    74         
    48         if c == len(self.recipients):
    75         if c == len(self.recipients):
    49             self._status(session,"sended") 
    76             self._status(session,"sended") 
    57         
    84         
    58         log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route))
    85         log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route))
    59 
    86 
    60     @dbdefer
    87     @dbdefer
    61     def setError(self, session, task, err):
    88     def setError(self, session, task, err):
       
    89         """errback for one task.
       
    90 
       
    91         This function updates the database object.
       
    92         """
    62         self.incStatus()
    93         self.incStatus()
    63         if self.status != "error":
    94         if self.status != "error":
    64             self._status(session,"error")
    95             self._status(session,"error")
    65         log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err)
    96         log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err)
    66 
    97 
    67 threadable.synchronize(ExJob)
    98 threadable.synchronize(ExJob)
    68 
    99 
    69 class ExJobs(dict, MutableMapping):
   100 class ExJobs(dict, MutableMapping):
    70 
   101     """ a dict to handle all jobs.
       
   102     """
    71     @dbdefer
   103     @dbdefer
    72     def create(self, session, user, recipients, message, offers, info=None):
   104     def create(self, session, user, recipients, message, offers, info=None):
       
   105         """creates on new Job.
       
   106         
       
   107         :param session: a valid session ( created by decorator :func:`iro.model.dbdefer.dbdefer`)
       
   108         :param `iro.model.schema.User` user: a user object
       
   109         :param list recipients: list of all recipients
       
   110         :param `iro.model.message.Message` message: message to send
       
   111         :param list offers: a list of offers ( list will be reduced to the allowed offers for the **user** -- using :func:`iro.model.offer.extendProvider`)
       
   112         :returns: the new job
       
   113         """
    73         user = session.merge(user)
   114         user = session.merge(user)
    74         job = schema.Job(info=info, status="started")
   115         job = schema.Job(info=info, status="started")
    75         user.jobs.append(job)
   116         user.jobs.append(job)
    76         session.commit()
   117         session.commit()
    77         
   118         
    78         o = offer.extendProvider(user, message.typ, offers, session=session)
   119         o = offer.extendProvider(user, message.typ, offers, session=session)
    79         self[job.id] = ExJob(job.id, recipients, message, o)
   120         self[job.id] = ExJob(job.id, recipients, message, o)
    80         return self[job.id]
   121         return self[job.id]
    81 
   122 
    82 exJobs = ExJobs()
   123 exJobs = ExJobs()
       
   124 """the dict of all available jobs."""