splitting tests.job -> test.job, tests.tasks
tests.job adding stati (setError,setStatus) tests
--- a/iro/model/job.py Sun Feb 19 17:44:44 2012 +0100
+++ b/iro/model/job.py Sun Feb 19 17:47:39 2012 +0100
@@ -1,3 +1,7 @@
+from twisted.python import log
+from twisted.python import threadable
+
+from datetime import datetime
from collections import MutableMapping
import schema
@@ -5,22 +9,62 @@
from .dbdefer import dbdefer
class ExJob:
- '''One Job is a class that handles one job has multiple tasks'''
+ '''One Job is a class that handles one job. One Job has multiple tasks.'''
+
+ synchronized = ["incStatus", "_status"]
+
def __init__(self, dbjob, recipients, message, offers):
self.dbjob = dbjob #Connection to mysql job element (id)
self.message = message
self.recipients = recipients
self.offers = offers
self.tasks={}
+ self.c = 0
+ self.status = "started"
def addTask(self,task):
self.tasks[task.recipient] = task
- def setStatus(task,status):
- pass
+ def incStatus(self):
+ self.c += 1
+ return self.c
+
+ def _status(self, session, status):
+ job = schema.Job.get(session, self.dbjob)
+ if self.status == "error":
+ return
+ elif self.status == "sended" and status != "error":
+ return
+ job.status = status
+ self.status = status
+ session.commit()
- def setError(task,err):
- pass
+ @dbdefer
+ def setStatus(self, session, task, status):
+ c = self.incStatus()
+ job = schema.Job.get(session, self.dbjob)
+
+ if job.status in ["started","init"]:
+ self._status(session,"sending")
+ if c == len(self.recipients):
+ self._status(session,"sended")
+
+ if status.costs > 0:
+ o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ)
+ job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), date=datetime.today(), offer=o))
+ session.commit()
+
+ log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route))
+
+ @dbdefer
+ def setError(self, session, task, err):
+ self.incStatus()
+ if self.status != "error":
+ self._status(session,"error")
+
+ log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err)
+
+threadable.synchronize(ExJob)
class ExJobs(dict, MutableMapping):
--- a/tests/job.py Sun Feb 19 17:44:44 2012 +0100
+++ b/tests/job.py Sun Feb 19 17:47:39 2012 +0100
@@ -1,24 +1,19 @@
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
-
-from Queue import deque
-
-from iro.controller.task import createJob, Task
-from iro.controller.pool import taskPool, dbPool
+from datetime import datetime
+from decimal import Decimal
+from mock import patch
from iro.model.job import exJobs, ExJob
+from iro.model.pool import data
+from iro.model.message import SMS
from iro.model.status import Status
-from iro.model.offer import offers
-from iro.model.pool import data
-
-from iro.model.message import SMS, Fax, Mail
from iro.model.schema import Job, User, Offer as DBOffer, Userright
-from iro.telnumber import Telnumber
+from iro.controller.task import Task
-from iro.offer import Offer, Provider
+from iro.offer.provider import Provider
+
+from iro.telnumber import Telnumber
from iro.validate import vInteger
-from iro.error import NoRouteForTask
from .dbtestcase import DBTestCase
@@ -26,10 +21,22 @@
def run(self, f,*a,**k):
return f(*a,**k)
+from twisted.python import log
-class exJobsTest(DBTestCase):
- '''tests for exJobs'''
+class DummyObserver(object):
+ def __init__(self):
+ self.e=[]
+
+ def start(self):
+ log.addObserver(self.emit)
+ def stop(self):
+ log.removeObserver(self.emit)
+
+ def emit(self, eventDict):
+ self.e.append(eventDict)
+
+class JobTestCase(DBTestCase):
def setUp(self):
DBTestCase.setUp(self)
self.pool = data.pool
@@ -41,6 +48,10 @@
self.pool = None
DBTestCase.tearDown(self)
+
+class exJobsTest(JobTestCase):
+ '''tests for exJobs'''
+
def testCreate(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
@@ -99,79 +110,86 @@
def testUnknownExJob(self):
self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890')
-class TestTasks(DBTestCase):
-
- def setUp(self):
- DBTestCase.setUp(self)
- dbPool.start(reactor)
- def tearDown(self):
- exJobs.clear()
- offers.clear()
- dbPool.pool.stop()
- taskPool.pool.q.queue = deque()
- DBTestCase.tearDown(self)
-
- @inlineCallbacks
- def testCreateSMS(self):
- with self.session() as session:
- u = User(name='test',apikey='abcdef123456789')
- session.add(u)
-
- job = yield createJob(u,[Telnumber('0123325456')],SMS('sms'),[])
-
- self.assertEqual(taskPool.pool.q.qsize(),1)
-
- self.assertEqual(job.tasks.keys(),[Telnumber('0123325456')])
- self.assertIsInstance(job.tasks[Telnumber('0123325456')], Task)
+class StatiTest(JobTestCase):
+ def setUp(self):
+ JobTestCase.setUp(self)
+ self.log = DummyObserver()
+ self.log.start()
- @inlineCallbacks
- def testRun(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
session.add(u)
- o=DBOffer(name="test", provider="bla", route="basic", typ="sms")
+ o=DBOffer(name="test", provider="bla", route="a", typ="sms")
u.rights.append(Userright(o))
-
- p=Provider(name="p", config={}, typs={"sms":["test",]})
- def send(typ,route,recipient,message):
- return Status(provider=p, route=route, error="Error: Test")
- p.send=send
- offers["test"] = Offer("test",provider=p, route="test", typ="sms")
- exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test'])
+ self.user = u
+ self.offer = o
- task=Task(Telnumber('123456789'), exjob)
- ret = yield task._run()
- self.assertIsInstance(ret, Status)
- self.assertEqual(ret.provider, p)
- self.assertEqual(ret.route, "test")
- self.assertEqual(ret.status, Status.ERROR)
- self.assertEqual(ret.error, "Error: Test")
+ self.provider=Provider("bla", [], {"sms":["a","b","c"]})
+ self.job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), [])
- @inlineCallbacks
- def testNoRoute(self):
- with self.session() as session:
- u = User(name='test',apikey='abcdef123456789')
- session.add(u)
-
- exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), [])
-
- task=Task(Telnumber('123456789'), exjob)
- d = task._run()
- self.assertFailure(d, NoRouteForTask)
+ def tearDown(self):
+ self.log.stop()
+ JobTestCase.tearDown(self)
- #def testCreateFax(self):
- # with self.session() as session:
- # u = User(name='test',apikey='abcdef123456789')
- # session.add(u)
- # job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[])
+ def testSetError(self):
+ self.job.setError(Task(Telnumber('123456789'),self),Exception("muhaha"))
+ errors = self.flushLoggedErrors(Exception)
+ self.assertEqual(len(errors), 1)
+ self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to '0049123456789' failed."%self.job.dbjob)
+
+ with self.session() as session:
+ u = session.merge(self.user)
+ job = u.job(self.job.dbjob)
+ self.assertEqual(job.status,"error")
+
+ def testSetStatus(self):
+ task = Task(Telnumber('123456789'),self.job)
+ status = Status(self.provider,"a")
+ self.job.setStatus(task, status)
+
+ self.assertEqual(self.log.e[0]['message'], ("Job(%s) to '0049123456789' ended sucecessfully via bla:a."%self.job.dbjob,))
+
+ with self.session() as session:
+ u = session.merge(self.user)
+ job = u.job(self.job.dbjob)
+ self.assertEqual(job.status,"sended")
+ self.assertEqual(len(job.messages),0)
- # self.assertEqual(taskPool.pool.q.qsize(),1)
+ def testMultipleRecipients(self):
+ self.job.recipients.append(Telnumber("01234567890"))
+ task = Task(Telnumber('123456789'),self.job)
+ status = Status(self.provider,"a")
+ self.job.setStatus(task, status)
+
+ with self.session() as session:
+ u = session.merge(self.user)
+ job = u.job(self.job.dbjob)
+ self.assertEqual(job.status,"sending")
+
+
- #def testCreateMail(self):
- # with self.session() as session:
- # u = User(name='test',apikey='abcdef123456789')
- # session.add(u)
- # job = createJob(u,[],Mail('sub','body','t@t.de'),[])
+ @patch("iro.model.job.datetime")
+ def testCosts(self,p_dt):
+ p_dt.today.return_value = datetime(2000, 1, 2, 3, 4, 5)
+ task = Task(Telnumber('123456789'),self.job)
+ status = Status(self.provider,"a")
+ status.costs = 0.055
+
+ self.job.setStatus(task, status)
+
+ with self.session() as session:
+ u = session.merge(self.user)
+ o = session.merge(self.offer)
+ job = u.job(self.job.dbjob)
+ self.assertEqual(job.status,"sended")
+ self.assertEqual(len(job.messages),1)
+
+ msg = job.messages[0]
+ self.assertEqual(msg.price,Decimal('0.0550'))
+ self.assertEqual(msg.isBilled,False)
+ self.assertEqual(msg.recipient,str(Telnumber('123456789')))
+ self.assertEqual(msg.date,datetime(2000, 1, 2, 3, 4, 5))
+ self.assertEqual(msg.offer,o)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/task.py Sun Feb 19 17:47:39 2012 +0100
@@ -0,0 +1,109 @@
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+
+from Queue import deque
+
+from iro.model.schema import User, Offer as DBOffer, Userright
+from iro.model.message import SMS
+from iro.model.status import Status
+from iro.model.offer import offers
+from iro.model.job import exJobs
+
+from iro.controller.task import createJob, Task
+from iro.controller.pool import taskPool, dbPool
+
+from iro.offer import Offer, Provider
+
+from iro.error import NoRouteForTask
+from iro.telnumber import Telnumber
+
+from .dbtestcase import DBTestCase
+
+class TaskTestCase(DBTestCase):
+ def setUp(self):
+ DBTestCase.setUp(self)
+ dbPool.start(reactor)
+
+ def tearDown(self):
+ exJobs.clear()
+ offers.clear()
+ dbPool.pool.stop()
+ taskPool.pool.q.queue = deque()
+ DBTestCase.tearDown(self)
+
+class TestTasks(TaskTestCase):
+
+ @inlineCallbacks
+ def testCreateSMS(self):
+ with self.session() as session:
+ u = User(name='test',apikey='abcdef123456789')
+ session.add(u)
+
+ job = yield createJob(u,[Telnumber('0123325456')],SMS('sms'),[])
+
+ self.assertEqual(taskPool.pool.q.qsize(),1)
+
+ self.assertEqual(job.tasks.keys(),[Telnumber('0123325456')])
+ self.assertIsInstance(job.tasks[Telnumber('0123325456')], Task)
+
+ @inlineCallbacks
+ def testRun(self):
+ with self.session() as session:
+ u = User(name='test',apikey='abcdef123456789')
+ session.add(u)
+ o=DBOffer(name="test", provider="bla", route="basic", typ="sms")
+ u.rights.append(Userright(o))
+
+ p=Provider(name="p", config={}, typs={"sms":["test",]})
+ def send(typ,route,recipient,message):
+ return Status(provider=p, route=route)
+ p.send=send
+ offers["test"] = Offer("test",provider=p, route="test", typ="sms")
+
+ exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test'])
+
+ task=Task(Telnumber('123456789'), exjob)
+ ret = yield task._run()
+ self.assertIsInstance(ret, Status)
+ self.assertEqual(ret.provider, p)
+ self.assertEqual(ret.route, "test")
+
+ @inlineCallbacks
+ def testNoRoute(self):
+ with self.session() as session:
+ u = User(name='test',apikey='abcdef123456789')
+ session.add(u)
+
+ exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), [])
+
+ task=Task(Telnumber('123456789'), exjob)
+ d = task._run()
+ self.assertFailure(d, NoRouteForTask)
+
+
+class CompletetaskTests(TaskTestCase):
+ def setUp(self):
+ TaskTestCase.setUp(self)
+ taskPool.start(reactor)
+
+ def tearDown(self):
+ taskPool.pool.stop()
+ TaskTestCase.tearDown(self)
+
+# def testStart(self):
+# pass
+
+
+ #def testCreateFax(self):
+ # with self.session() as session:
+ # u = User(name='test',apikey='abcdef123456789')
+ # session.add(u)
+ # job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[])
+
+ # self.assertEqual(taskPool.pool.q.qsize(),1)
+
+ #def testCreateMail(self):
+ # with self.session() as session:
+ # u = User(name='test',apikey='abcdef123456789')
+ # session.add(u)
+ # job = createJob(u,[],Mail('sub','body','t@t.de'),[])