# HG changeset patch # User Sandro Knauß # Date 1329670059 -3600 # Node ID c29acd5fb8410ee2ee79cef5d51398ffca77ee78 # Parent 762981cada07cc99f38d442b5a64815d4d980939 splitting tests.job -> test.job, tests.tasks tests.job adding stati (setError,setStatus) tests diff -r 762981cada07 -r c29acd5fb841 iro/model/job.py --- a/iro/model/job.py Sun Feb 19 17:44:44 2012 +0100 +++ b/iro/model/job.py Sun Feb 19 17:47:39 2012 +0100 @@ -1,3 +1,7 @@ +from twisted.python import log +from twisted.python import threadable + +from datetime import datetime from collections import MutableMapping import schema @@ -5,22 +9,62 @@ from .dbdefer import dbdefer class ExJob: - '''One Job is a class that handles one job has multiple tasks''' + '''One Job is a class that handles one job. One Job has multiple tasks.''' + + synchronized = ["incStatus", "_status"] + def __init__(self, dbjob, recipients, message, offers): self.dbjob = dbjob #Connection to mysql job element (id) self.message = message self.recipients = recipients self.offers = offers self.tasks={} + self.c = 0 + self.status = "started" def addTask(self,task): self.tasks[task.recipient] = task - def setStatus(task,status): - pass + def incStatus(self): + self.c += 1 + return self.c + + def _status(self, session, status): + job = schema.Job.get(session, self.dbjob) + if self.status == "error": + return + elif self.status == "sended" and status != "error": + return + job.status = status + self.status = status + session.commit() - def setError(task,err): - pass + @dbdefer + def setStatus(self, session, task, status): + c = self.incStatus() + job = schema.Job.get(session, self.dbjob) + + if job.status in ["started","init"]: + self._status(session,"sending") + if c == len(self.recipients): + self._status(session,"sended") + + if status.costs > 0: + o = schema.Offer.get(session, status.provider.name, status.route, self.message.typ) + job.messages.append(schema.Message(price=status.costs, isBilled=False, recipient=str(task.recipient), date=datetime.today(), offer=o)) + session.commit() + + log.msg("Job(%s) to '%s' ended sucecessfully via %s:%s."%(self.dbjob, task.recipient, status.provider.name,status.route)) + + @dbdefer + def setError(self, session, task, err): + self.incStatus() + if self.status != "error": + self._status(session,"error") + + log.err(_why="Error: Job(%s) to '%s' failed."%(self.dbjob, task.recipient),_stuff=err) + +threadable.synchronize(ExJob) class ExJobs(dict, MutableMapping): diff -r 762981cada07 -r c29acd5fb841 tests/job.py --- a/tests/job.py Sun Feb 19 17:44:44 2012 +0100 +++ b/tests/job.py Sun Feb 19 17:47:39 2012 +0100 @@ -1,24 +1,19 @@ -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks - -from Queue import deque - -from iro.controller.task import createJob, Task -from iro.controller.pool import taskPool, dbPool +from datetime import datetime +from decimal import Decimal +from mock import patch from iro.model.job import exJobs, ExJob +from iro.model.pool import data +from iro.model.message import SMS from iro.model.status import Status -from iro.model.offer import offers -from iro.model.pool import data - -from iro.model.message import SMS, Fax, Mail from iro.model.schema import Job, User, Offer as DBOffer, Userright -from iro.telnumber import Telnumber +from iro.controller.task import Task -from iro.offer import Offer, Provider +from iro.offer.provider import Provider + +from iro.telnumber import Telnumber from iro.validate import vInteger -from iro.error import NoRouteForTask from .dbtestcase import DBTestCase @@ -26,10 +21,22 @@ def run(self, f,*a,**k): return f(*a,**k) +from twisted.python import log -class exJobsTest(DBTestCase): - '''tests for exJobs''' +class DummyObserver(object): + def __init__(self): + self.e=[] + + def start(self): + log.addObserver(self.emit) + def stop(self): + log.removeObserver(self.emit) + + def emit(self, eventDict): + self.e.append(eventDict) + +class JobTestCase(DBTestCase): def setUp(self): DBTestCase.setUp(self) self.pool = data.pool @@ -41,6 +48,10 @@ self.pool = None DBTestCase.tearDown(self) + +class exJobsTest(JobTestCase): + '''tests for exJobs''' + def testCreate(self): with self.session() as session: u = User(name='test',apikey='abcdef123456789') @@ -99,79 +110,86 @@ def testUnknownExJob(self): self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890') -class TestTasks(DBTestCase): - - def setUp(self): - DBTestCase.setUp(self) - dbPool.start(reactor) - def tearDown(self): - exJobs.clear() - offers.clear() - dbPool.pool.stop() - taskPool.pool.q.queue = deque() - DBTestCase.tearDown(self) - - @inlineCallbacks - def testCreateSMS(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - - job = yield createJob(u,[Telnumber('0123325456')],SMS('sms'),[]) - - self.assertEqual(taskPool.pool.q.qsize(),1) - - self.assertEqual(job.tasks.keys(),[Telnumber('0123325456')]) - self.assertIsInstance(job.tasks[Telnumber('0123325456')], Task) +class StatiTest(JobTestCase): + def setUp(self): + JobTestCase.setUp(self) + self.log = DummyObserver() + self.log.start() - @inlineCallbacks - def testRun(self): with self.session() as session: u = User(name='test',apikey='abcdef123456789') session.add(u) - o=DBOffer(name="test", provider="bla", route="basic", typ="sms") + o=DBOffer(name="test", provider="bla", route="a", typ="sms") u.rights.append(Userright(o)) - - p=Provider(name="p", config={}, typs={"sms":["test",]}) - def send(typ,route,recipient,message): - return Status(provider=p, route=route, error="Error: Test") - p.send=send - offers["test"] = Offer("test",provider=p, route="test", typ="sms") - exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) + self.user = u + self.offer = o - task=Task(Telnumber('123456789'), exjob) - ret = yield task._run() - self.assertIsInstance(ret, Status) - self.assertEqual(ret.provider, p) - self.assertEqual(ret.route, "test") - self.assertEqual(ret.status, Status.ERROR) - self.assertEqual(ret.error, "Error: Test") + self.provider=Provider("bla", [], {"sms":["a","b","c"]}) + self.job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), []) - @inlineCallbacks - def testNoRoute(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - - exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), []) - - task=Task(Telnumber('123456789'), exjob) - d = task._run() - self.assertFailure(d, NoRouteForTask) + def tearDown(self): + self.log.stop() + JobTestCase.tearDown(self) - #def testCreateFax(self): - # with self.session() as session: - # u = User(name='test',apikey='abcdef123456789') - # session.add(u) - # job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[]) + def testSetError(self): + self.job.setError(Task(Telnumber('123456789'),self),Exception("muhaha")) + errors = self.flushLoggedErrors(Exception) + self.assertEqual(len(errors), 1) + self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to '0049123456789' failed."%self.job.dbjob) + + with self.session() as session: + u = session.merge(self.user) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"error") + + def testSetStatus(self): + task = Task(Telnumber('123456789'),self.job) + status = Status(self.provider,"a") + self.job.setStatus(task, status) + + self.assertEqual(self.log.e[0]['message'], ("Job(%s) to '0049123456789' ended sucecessfully via bla:a."%self.job.dbjob,)) + + with self.session() as session: + u = session.merge(self.user) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"sended") + self.assertEqual(len(job.messages),0) - # self.assertEqual(taskPool.pool.q.qsize(),1) + def testMultipleRecipients(self): + self.job.recipients.append(Telnumber("01234567890")) + task = Task(Telnumber('123456789'),self.job) + status = Status(self.provider,"a") + self.job.setStatus(task, status) + + with self.session() as session: + u = session.merge(self.user) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"sending") + + - #def testCreateMail(self): - # with self.session() as session: - # u = User(name='test',apikey='abcdef123456789') - # session.add(u) - # job = createJob(u,[],Mail('sub','body','t@t.de'),[]) + @patch("iro.model.job.datetime") + def testCosts(self,p_dt): + p_dt.today.return_value = datetime(2000, 1, 2, 3, 4, 5) + task = Task(Telnumber('123456789'),self.job) + status = Status(self.provider,"a") + status.costs = 0.055 + + self.job.setStatus(task, status) + + with self.session() as session: + u = session.merge(self.user) + o = session.merge(self.offer) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"sended") + self.assertEqual(len(job.messages),1) + + msg = job.messages[0] + self.assertEqual(msg.price,Decimal('0.0550')) + self.assertEqual(msg.isBilled,False) + self.assertEqual(msg.recipient,str(Telnumber('123456789'))) + self.assertEqual(msg.date,datetime(2000, 1, 2, 3, 4, 5)) + self.assertEqual(msg.offer,o) diff -r 762981cada07 -r c29acd5fb841 tests/task.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/task.py Sun Feb 19 17:47:39 2012 +0100 @@ -0,0 +1,109 @@ +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks + +from Queue import deque + +from iro.model.schema import User, Offer as DBOffer, Userright +from iro.model.message import SMS +from iro.model.status import Status +from iro.model.offer import offers +from iro.model.job import exJobs + +from iro.controller.task import createJob, Task +from iro.controller.pool import taskPool, dbPool + +from iro.offer import Offer, Provider + +from iro.error import NoRouteForTask +from iro.telnumber import Telnumber + +from .dbtestcase import DBTestCase + +class TaskTestCase(DBTestCase): + def setUp(self): + DBTestCase.setUp(self) + dbPool.start(reactor) + + def tearDown(self): + exJobs.clear() + offers.clear() + dbPool.pool.stop() + taskPool.pool.q.queue = deque() + DBTestCase.tearDown(self) + +class TestTasks(TaskTestCase): + + @inlineCallbacks + def testCreateSMS(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + + job = yield createJob(u,[Telnumber('0123325456')],SMS('sms'),[]) + + self.assertEqual(taskPool.pool.q.qsize(),1) + + self.assertEqual(job.tasks.keys(),[Telnumber('0123325456')]) + self.assertIsInstance(job.tasks[Telnumber('0123325456')], Task) + + @inlineCallbacks + def testRun(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="test", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o)) + + p=Provider(name="p", config={}, typs={"sms":["test",]}) + def send(typ,route,recipient,message): + return Status(provider=p, route=route) + p.send=send + offers["test"] = Offer("test",provider=p, route="test", typ="sms") + + exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) + + task=Task(Telnumber('123456789'), exjob) + ret = yield task._run() + self.assertIsInstance(ret, Status) + self.assertEqual(ret.provider, p) + self.assertEqual(ret.route, "test") + + @inlineCallbacks + def testNoRoute(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + + exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), []) + + task=Task(Telnumber('123456789'), exjob) + d = task._run() + self.assertFailure(d, NoRouteForTask) + + +class CompletetaskTests(TaskTestCase): + def setUp(self): + TaskTestCase.setUp(self) + taskPool.start(reactor) + + def tearDown(self): + taskPool.pool.stop() + TaskTestCase.tearDown(self) + +# def testStart(self): +# pass + + + #def testCreateFax(self): + # with self.session() as session: + # u = User(name='test',apikey='abcdef123456789') + # session.add(u) + # job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[]) + + # self.assertEqual(taskPool.pool.q.qsize(),1) + + #def testCreateMail(self): + # with self.session() as session: + # u = User(name='test',apikey='abcdef123456789') + # session.add(u) + # job = createJob(u,[],Mail('sub','body','t@t.de'),[])