|
1 from twisted.python import log |
|
2 from twisted.python import threadable |
|
3 |
|
4 from datetime import datetime |
1 from collections import MutableMapping |
5 from collections import MutableMapping |
2 |
6 |
3 import schema |
7 import schema |
4 import offer |
8 import offer |
5 from .dbdefer import dbdefer |
9 from .dbdefer import dbdefer |
6 |
10 |
7 class ExJob: |
11 class ExJob: |
8 '''One Job is a class that handles one job has multiple tasks''' |
12 '''One Job is a class that handles one job. One Job has multiple tasks.''' |
|
13 |
|
14 synchronized = ["incStatus", "_status"] |
|
15 |
9 def __init__(self, dbjob, recipients, message, offers): |
16 def __init__(self, dbjob, recipients, message, offers): |
10 self.dbjob = dbjob #Connection to mysql job element (id) |
17 self.dbjob = dbjob #Connection to mysql job element (id) |
11 self.message = message |
18 self.message = message |
12 self.recipients = recipients |
19 self.recipients = recipients |
13 self.offers = offers |
20 self.offers = offers |
14 self.tasks={} |
21 self.tasks={} |
|
22 self.c = 0 |
|
23 self.status = "started" |
15 |
24 |
16 def addTask(self,task): |
25 def addTask(self,task): |
17 self.tasks[task.recipient] = task |
26 self.tasks[task.recipient] = task |
18 |
27 |
19 def setStatus(task,status): |
28 def incStatus(self): |
20 pass |
29 self.c += 1 |
|
30 return self.c |
21 |
31 |
22 def setError(task,err): |
32 def _status(self, session, status): |
23 pass |
33 job = schema.Job.get(session, self.dbjob) |
|
34 if self.status == "error": |
|
35 return |
|
36 elif self.status == "sended" and status != "error": |
|
37 return |
|
38 job.status = status |
|
39 self.status = status |
|
40 session.commit() |
|
41 |
|
42 @dbdefer |
|
43 def setStatus(self, session, task, status): |
|
44 c = self.incStatus() |
|
45 job = schema.Job.get(session, self.dbjob) |
|
46 |
|
47 if job.status in ["started","init"]: |
|
48 self._status(session,"sending") |
|
49 if c == len(self.recipients): |
|
50 self._status(session,"sended") |
|
51 |
|
52 if status.costs > 0: |
|
53 o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ) |
|
54 job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), date=datetime.today(), offer=o)) |
|
55 session.commit() |
|
56 |
|
57 log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route)) |
|
58 |
|
59 @dbdefer |
|
60 def setError(self, session, task, err): |
|
61 self.incStatus() |
|
62 if self.status != "error": |
|
63 self._status(session,"error") |
|
64 |
|
65 log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err) |
|
66 |
|
67 threadable.synchronize(ExJob) |
24 |
68 |
25 class ExJobs(dict, MutableMapping): |
69 class ExJobs(dict, MutableMapping): |
26 |
70 |
27 @dbdefer |
71 @dbdefer |
28 def create(self, session, user, recipients, message, offers, info=None): |
72 def create(self, session, user, recipients, message, offers, info=None): |