iro/controller/task.py
branchdevel
changeset 234 08fcc2b6df99
parent 217 d755b2e0cc0b
child 238 c36b117f7400
equal deleted inserted replaced
233:34435357dc8a 234:08fcc2b6df99
     1 from functools import partial
     1 from functools import partial
     2 
     2 
     3 from twisted.internet.defer import inlineCallbacks, returnValue 
     3 from twisted.internet.defer import inlineCallbacks, returnValue, maybeDeferred, Deferred
     4 
     4 
     5 from ..error import NoRouteForTask, RejectRecipient
     5 from ..error import NoRouteForTask, RejectRecipient
     6 
     6 
     7 from ..model.offer import offers
     7 from ..model.offer import offers
     8 from ..model.job import exJobs
     8 from ..model.job import exJobs
    21 
    21 
    22     def setStatus(self,status):
    22     def setStatus(self,status):
    23         self.status = status
    23         self.status = status
    24         return status
    24         return status
    25 
    25 
    26     def setError(self,error):
    26     def setError(self, error):
    27         self.status = error
    27         self.status = error
    28         self.error = True
    28         self.error = True
    29         return error
    29         return error
    30 
    30 
    31     def start(self):
    31     def start(self):
    32         self.d = taskPool.run(self._run)
    32         self.d = Deferred()
    33         self.d.addCallback(self.setStatus)
    33         self.d.addCallback(self.setStatus)
    34         self.d.addCallback(partial(self.job.setStatus,self))
    34         self.d.addCallback(partial(self.job.setStatus,self))
    35         self.d.addErrback(self.setError)
    35         self.d.addErrback(self.setError)
    36         self.d.addErrback(partial(self.job.setError,self))
    36         self.d.addErrback(partial(self.job.setError,self))
       
    37         taskPool.run(self._run)
    37         return self.d
    38         return self.d
    38 
    39 
    39     @inlineCallbacks
       
    40     def _run(self):
    40     def _run(self):
    41         for offer in self.job.offers:
    41         os= (i for i in self.job.offers)
       
    42         def n():
    42             try:
    43             try:
    43                 ret = yield offers[offer](self.recipient,self.job.message)
    44                 offer = os.next()
    44                 returnValue(ret)
    45                 d = maybeDeferred(offers[offer],self.recipient,self.job.message)
    45             except RejectRecipient:
    46                 d.addCallback(self.d.callback)
    46                 continue
    47                 d.addErrback(addErr)
    47         else:
    48                 d.addErrback(self.d.errback)
    48             raise NoRouteForTask()
    49                 return d
       
    50             except StopIteration:
       
    51                 self.d.errback(NoRouteForTask())
       
    52 
       
    53         def addErr(failure):
       
    54             failure.trap(RejectRecipient)
       
    55             return n()
       
    56         
       
    57         n()
    49 
    58 
    50 
    59 
    51 @inlineCallbacks
    60 @inlineCallbacks
    52 def createJob(user,recipients, msg, offers, info=None):
    61 def createJob(user,recipients, msg, offers, info=None):
    53     job = yield exJobs.create(user, recipients, msg, offers, info)
    62     job = yield exJobs.create(user, recipients, msg, offers, info)