from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from Queue import deque
from iro.controller.task import createJob, Task
from iro.controller.pool import taskPool, dbPool
from iro.model.job import exJobs, ExJob, Status
from iro.model.pool import data
from iro.model.message import SMS, Fax, Mail
from iro.model.schema import Job, User
from iro.telnumber import Telnumber
from iro.validate import vInteger
from iro.error import NoRoute
from .dbtestcase import DBTestCase
class DumpPool():
def run(self, f,*a,**k):
return f(*a,**k)
class exJobsTest(DBTestCase):
'''tests for exJobs'''
def setUp(self):
DBTestCase.setUp(self)
self.pool = data.pool
data.pool = DumpPool()
def tearDown(self):
exJobs.clear()
data.pool = self.pool
self.pool = None
DBTestCase.tearDown(self)
def testCreate(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
session.add(u)
job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test'])
self.assertIsInstance(job, ExJob)
self.assertTrue(vInteger(job.dbjob, None, minv=0 ))
self.assertEqual(job.message, SMS('test'))
self.assertEqual(job.recipients, [Telnumber('123456789')])
self.assertEqual(job.offers,['test'])
self.assertEqual(job.tasks,{})
with self.session() as session:
j = session.query(Job.id).all()
self.assertEqual(j,[(job.dbjob,)])
self.assertEqual(exJobs[job.dbjob],job)
def testGet(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
session.add(u)
job = ExJob(None, [Telnumber('123456789')], SMS('test'), ['test'])
exJobs[1] = job
self.assertEqual(len(exJobs), 1)
self.assertEqual(job, exJobs[1])
def testGetFromDB(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
job = Job( info="info", status="started")
u.jobs.append(job)
session.add(u)
with self.session() as session:
job = session.merge(job)
u = session.merge(u)
ejob= ExJob(job.id, [Telnumber('123456789')], SMS('test'), ['test'])
exJobs[job.id]=ejob
self.assertEqual(job.extend, ejob)
self.assertEqual(u.jobs[0].extend,ejob)
def testUnknownExJob(self):
self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890')
class TestTasks(DBTestCase):
def setUp(self):
DBTestCase.setUp(self)
dbPool.start(reactor)
def tearDown(self):
exJobs.clear()
dbPool.pool.stop()
taskPool.pool.q.queue = deque()
DBTestCase.tearDown(self)
@inlineCallbacks
def testCreateSMS(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
session.add(u)
job = yield createJob(u,[Telnumber('0123325456')],SMS('sms'),[])
self.assertEqual(taskPool.pool.q.qsize(),1)
self.assertEqual(job.tasks.keys(),[Telnumber('0123325456')])
self.assertIsInstance(job.tasks[Telnumber('0123325456')], Task)
@inlineCallbacks
def testRun(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
session.add(u)
exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test'])
task=Task(Telnumber('123456789'), exjob)
ret = yield task._run()
self.assertIsInstance(ret, Status)
self.assertEqual(ret.offer,"test")
self.assertEqual(ret.status, Status.error)
self.assertEqual(ret.errtext,"Error: Test")
@inlineCallbacks
def testNoRoute(self):
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
session.add(u)
exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), [])
task=Task(Telnumber('123456789'), exjob)
d = task._run()
self.assertFailure(d, NoRoute)
#def testCreateFax(self):
# with self.session() as session:
# u = User(name='test',apikey='abcdef123456789')
# session.add(u)
# job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[])
# self.assertEqual(taskPool.pool.q.qsize(),1)
#def testCreateMail(self):
# with self.session() as session:
# u = User(name='test',apikey='abcdef123456789')
# session.add(u)
# job = createJob(u,[],Mail('sub','body','t@t.de'),[])