|
1 # Copyright (c) 2012 netzguerilla.net <iro@netzguerilla.net> |
|
2 # |
|
3 # This file is part of Iro. |
|
4 # |
|
5 # Permission is hereby granted, free of charge, to any person obtaining a copy of |
|
6 # this software and associated documentation files (the "Software"), to deal in |
|
7 # the Software without restriction, including without limitation the rights to use, |
|
8 # copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the |
|
9 # #Software, and to permit persons to whom the Software is furnished to do so, |
|
10 # subject to the following conditions: |
|
11 # |
|
12 # The above copyright notice and this permission notice shall be included in |
|
13 # all copies or substantial portions of the Software. |
|
14 # |
|
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, |
|
16 # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A |
|
17 # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT |
|
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
|
19 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
|
20 # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
21 |
|
22 from twisted.python import log, threadable |
|
23 |
|
24 from datetime import datetime |
|
25 from collections import MutableMapping |
|
26 |
|
27 import schema |
|
28 import offer |
|
29 from .dbdefer import dbdefer |
|
30 |
|
31 class ExJob: |
|
32 ''' A ExJob object represents a message to multiple recipients over multiple offers to send. |
|
33 |
|
34 One single message to one recipient is handeld in :class:`iro.controller.task.Task`. |
|
35 This class holds connections to all tasks. |
|
36 This class is responsiple to update the status in database of one job and updating the bill. |
|
37 ''' |
|
38 |
|
39 synchronized = ["incStatus", "_status"] |
|
40 |
|
41 def __init__(self, dbjob, recipients, message, offers): |
|
42 """Constructor of ExJob. |
|
43 |
|
44 :param dbjob: primary key of the job element in database |
|
45 :param list recipients: list of all recipients |
|
46 :param `iro.model.message.Message` message: message to send |
|
47 :param list offers: list of all possible offers to send message over |
|
48 """ |
|
49 |
|
50 self.dbjob = dbjob #Connection to database job element (id) |
|
51 self.message = message |
|
52 self.recipients = recipients |
|
53 self.offers = offers |
|
54 self.tasks={} |
|
55 self.c = 0 |
|
56 self.status = "started" |
|
57 log.msg("Job(%s) created."%(self.dbjob)) |
|
58 |
|
59 def addTask(self,task): |
|
60 """adding a task to tasks dict - key is the recipient. |
|
61 |
|
62 :param `iro.controller.task.Task` task: a task |
|
63 """ |
|
64 self.tasks[task.recipient] = task |
|
65 |
|
66 def incStatus(self): |
|
67 """increments the processed messages (function is threadsafe).""" |
|
68 self.c += 1 |
|
69 return self.c |
|
70 |
|
71 def _status(self, session, status): |
|
72 """updates the status of the database object (function is threadsafe). |
|
73 |
|
74 :param session: a valid database session |
|
75 :param string status: new status |
|
76 """ |
|
77 job = schema.Job.get(session, self.dbjob) |
|
78 if self.status == "error": |
|
79 return |
|
80 elif self.status == "sended" and status != "error": |
|
81 return |
|
82 job.status = status |
|
83 self.status = status |
|
84 log.msg("Job(%s) status changed to: %s."%(self.dbjob, status)) |
|
85 session.commit() |
|
86 |
|
87 @dbdefer |
|
88 def setStatus(self, session, task, status): |
|
89 """callback of one task. |
|
90 |
|
91 This function updates the database object and the bill. |
|
92 """ |
|
93 c = self.incStatus() |
|
94 job = schema.Job.get(session, self.dbjob) |
|
95 |
|
96 if c == len(self.recipients): |
|
97 self._status(session,"sended") |
|
98 elif job.status in ["started","init"]: |
|
99 self._status(session,"sending") |
|
100 |
|
101 if status.costs > 0: |
|
102 o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ) |
|
103 job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), count=status.count, exID=status.exID, date=datetime.today(), offer=o)) |
|
104 session.commit() |
|
105 |
|
106 log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route)) |
|
107 |
|
108 @dbdefer |
|
109 def setError(self, session, task, err): |
|
110 """errback for one task. |
|
111 |
|
112 This function updates the database object. |
|
113 """ |
|
114 self.incStatus() |
|
115 if self.status != "error": |
|
116 self._status(session,"error") |
|
117 log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err) |
|
118 |
|
119 threadable.synchronize(ExJob) |
|
120 |
|
121 class ExJobs(dict, MutableMapping): |
|
122 """ a dict to handle all jobs. |
|
123 """ |
|
124 @dbdefer |
|
125 def create(self, session, user, recipients, message, offers, info=None): |
|
126 """creates on new Job. |
|
127 |
|
128 :param session: a valid session ( created by decorator :func:`iro.model.dbdefer.dbdefer`) |
|
129 :param `iro.model.schema.User` user: a user object |
|
130 :param list recipients: list of all recipients |
|
131 :param `iro.model.message.Message` message: message to send |
|
132 :param list offers: a list of offers ( list will be reduced to the allowed offers for the **user** -- using :func:`iro.model.offer.extendProvider`) |
|
133 :param string info: a bill group name |
|
134 :returns: the new job |
|
135 """ |
|
136 user = session.merge(user) |
|
137 job = schema.Job(info=info, status="started") |
|
138 user.jobs.append(job) |
|
139 session.commit() |
|
140 |
|
141 o = offer.extendProvider(user, message.typ, offers, session=session) |
|
142 self[job.id] = ExJob(job.id, recipients, message, o) |
|
143 return self[job.id] |
|
144 |
|
145 exJobs = ExJobs() |
|
146 """the dict of all available jobs.""" |