6 import schema |
6 import schema |
7 import offer |
7 import offer |
8 from .dbdefer import dbdefer |
8 from .dbdefer import dbdefer |
9 |
9 |
10 class ExJob: |
10 class ExJob: |
11 '''One Job is a class that handles one job. One Job has multiple tasks.''' |
11 ''' A ExJob object represents a message to multiple recipients over multiple offers to send. |
|
12 |
|
13 One single message to one recipient is handeld in :class:`iro.controller.task.Task`. |
|
14 This class holds connections to all tasks. |
|
15 This class is responsiple to update the status in database of one job and updating the bill. |
|
16 ''' |
12 |
17 |
13 synchronized = ["incStatus", "_status"] |
18 synchronized = ["incStatus", "_status"] |
14 |
19 |
15 def __init__(self, dbjob, recipients, message, offers): |
20 def __init__(self, dbjob, recipients, message, offers): |
16 self.dbjob = dbjob #Connection to mysql job element (id) |
21 """Constructor of ExJob. |
|
22 |
|
23 :param dbjob: primary key of the job element in database |
|
24 :param list recipients: list of all recipients |
|
25 :param `iro.model.message.Message` message: message to send |
|
26 :param list offers: list of all possible offers to send message over |
|
27 """ |
|
28 |
|
29 self.dbjob = dbjob #Connection to database job element (id) |
17 self.message = message |
30 self.message = message |
18 self.recipients = recipients |
31 self.recipients = recipients |
19 self.offers = offers |
32 self.offers = offers |
20 self.tasks={} |
33 self.tasks={} |
21 self.c = 0 |
34 self.c = 0 |
22 self.status = "started" |
35 self.status = "started" |
23 log.msg("Job(%s) created."%(self.dbjob)) |
36 log.msg("Job(%s) created."%(self.dbjob)) |
24 |
37 |
25 def addTask(self,task): |
38 def addTask(self,task): |
|
39 """adding a task to tasks dict - key is the recipient. |
|
40 |
|
41 :param `iro.controller.task.Task` task: a task |
|
42 """ |
26 self.tasks[task.recipient] = task |
43 self.tasks[task.recipient] = task |
27 |
44 |
28 def incStatus(self): |
45 def incStatus(self): |
|
46 """increments the processed messages (function is threadsafe).""" |
29 self.c += 1 |
47 self.c += 1 |
30 return self.c |
48 return self.c |
31 |
49 |
32 def _status(self, session, status): |
50 def _status(self, session, status): |
|
51 """updates the status of the database object (function is threadsafe). |
|
52 |
|
53 :param session: a valid database session |
|
54 :param string status: new status |
|
55 """ |
33 job = schema.Job.get(session, self.dbjob) |
56 job = schema.Job.get(session, self.dbjob) |
34 if self.status == "error": |
57 if self.status == "error": |
35 return |
58 return |
36 elif self.status == "sended" and status != "error": |
59 elif self.status == "sended" and status != "error": |
37 return |
60 return |
40 log.msg("Job(%s) status changed to: %s."%(self.dbjob, status)) |
63 log.msg("Job(%s) status changed to: %s."%(self.dbjob, status)) |
41 session.commit() |
64 session.commit() |
42 |
65 |
43 @dbdefer |
66 @dbdefer |
44 def setStatus(self, session, task, status): |
67 def setStatus(self, session, task, status): |
|
68 """callback of one task. |
|
69 |
|
70 This function updates the database object and the bill. |
|
71 """ |
45 c = self.incStatus() |
72 c = self.incStatus() |
46 job = schema.Job.get(session, self.dbjob) |
73 job = schema.Job.get(session, self.dbjob) |
47 |
74 |
48 if c == len(self.recipients): |
75 if c == len(self.recipients): |
49 self._status(session,"sended") |
76 self._status(session,"sended") |
57 |
84 |
58 log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route)) |
85 log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route)) |
59 |
86 |
60 @dbdefer |
87 @dbdefer |
61 def setError(self, session, task, err): |
88 def setError(self, session, task, err): |
|
89 """errback for one task. |
|
90 |
|
91 This function updates the database object. |
|
92 """ |
62 self.incStatus() |
93 self.incStatus() |
63 if self.status != "error": |
94 if self.status != "error": |
64 self._status(session,"error") |
95 self._status(session,"error") |
65 log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err) |
96 log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err) |
66 |
97 |
67 threadable.synchronize(ExJob) |
98 threadable.synchronize(ExJob) |
68 |
99 |
69 class ExJobs(dict, MutableMapping): |
100 class ExJobs(dict, MutableMapping): |
70 |
101 """ a dict to handle all jobs. |
|
102 """ |
71 @dbdefer |
103 @dbdefer |
72 def create(self, session, user, recipients, message, offers, info=None): |
104 def create(self, session, user, recipients, message, offers, info=None): |
|
105 """creates on new Job. |
|
106 |
|
107 :param session: a valid session ( created by decorator :func:`iro.model.dbdefer.dbdefer`) |
|
108 :param `iro.model.schema.User` user: a user object |
|
109 :param list recipients: list of all recipients |
|
110 :param `iro.model.message.Message` message: message to send |
|
111 :param list offers: a list of offers ( list will be reduced to the allowed offers for the **user** -- using :func:`iro.model.offer.extendProvider`) |
|
112 :returns: the new job |
|
113 """ |
73 user = session.merge(user) |
114 user = session.merge(user) |
74 job = schema.Job(info=info, status="started") |
115 job = schema.Job(info=info, status="started") |
75 user.jobs.append(job) |
116 user.jobs.append(job) |
76 session.commit() |
117 session.commit() |
77 |
118 |
78 o = offer.extendProvider(user, message.typ, offers, session=session) |
119 o = offer.extendProvider(user, message.typ, offers, session=session) |
79 self[job.id] = ExJob(job.id, recipients, message, o) |
120 self[job.id] = ExJob(job.id, recipients, message, o) |
80 return self[job.id] |
121 return self[job.id] |
81 |
122 |
82 exJobs = ExJobs() |
123 exJobs = ExJobs() |
|
124 """the dict of all available jobs.""" |