1 import unittest |
1 import unittest |
|
2 from Queue import deque |
2 |
3 |
3 from iro.controller.task import createJob |
4 from iro.controller.task import createJob, Task |
|
5 from iro.controller.pool import taskPool |
4 |
6 |
5 from iro.model.job import jobs, Job |
7 from iro.model.job import exJobs, ExJob |
6 |
8 |
7 from iro.model.pool import data |
9 from iro.model.pool import data |
8 |
10 |
9 from iro.model.message import SMS, Fax, Mail |
11 from iro.model.message import SMS, Fax, Mail |
10 from iro.model.schema import Job as DBJob, User |
12 from iro.model.schema import Job, User |
11 from iro.telnumber import Telnumber |
13 from iro.telnumber import Telnumber |
12 |
14 |
13 from iro.error import JobNotFound |
15 from iro.validate import vInteger |
14 from iro.validate import vHash |
|
15 |
16 |
16 |
17 |
17 from .dbtestcase import DBTestCase, setUpModule, tearDownModule |
18 from .dbtestcase import DBTestCase |
18 |
19 |
19 #activates all logging we can get. |
20 #activates all logging we can get. |
20 |
|
21 from twisted.python import log |
|
22 import logging |
|
23 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s') |
|
24 observer = log.PythonLoggingObserver() |
|
25 observer.start() |
|
26 |
21 |
27 class DumpPool(): |
22 class DumpPool(): |
28 def run(self, f,*a,**k): |
23 def run(self, f,*a,**k): |
29 return f(*a,**k) |
24 return f(*a,**k) |
30 |
25 |
31 |
26 |
32 data.pool=DumpPool() |
27 class exJobsTest(DBTestCase): |
|
28 '''tests for exJobs''' |
33 |
29 |
34 |
30 def setUp(self): |
35 class JobsTest(DBTestCase): |
31 DBTestCase.setUp(self) |
36 '''tests for jobs''' |
32 self.pool = data.pool |
|
33 data.pool = DumpPool() |
37 |
34 |
38 def tearDown(self): |
35 def tearDown(self): |
39 jobs.clear() |
36 exJobs.clear() |
|
37 data.pool = self.pool |
|
38 self.pool = None |
40 DBTestCase.tearDown(self) |
39 DBTestCase.tearDown(self) |
41 |
40 |
42 def testCreate(self): |
41 def testCreate(self): |
43 with self.session() as session: |
42 with self.session() as session: |
44 u = User(name='test',apikey='abcdef123456789') |
43 u = User(name='test',apikey='abcdef123456789') |
45 session.add(u) |
44 session.add(u) |
46 |
45 |
47 job = jobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) |
46 job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) |
48 self.assertIsInstance(job, Job) |
47 self.assertIsInstance(job, ExJob) |
49 self.assertTrue(vHash(job.id, None, 40, 40)) |
48 self.assertTrue(vInteger(job.dbjob, None, minv=0 )) |
|
49 self.assertEqual(job.message, SMS('test')) |
|
50 self.assertEqual(job.recipients, [Telnumber('123456789')]) |
|
51 self.assertEqual(job.offers,['test']) |
|
52 self.assertEqual(job.tasks,{}) |
50 |
53 |
51 with self.session() as session: |
54 with self.session() as session: |
52 j = session.query(DBJob.hash).all() |
55 j = session.query(Job.id).all() |
53 self.assertEqual(j,[(job.id,)]) |
56 self.assertEqual(j,[(job.dbjob,)]) |
54 |
57 |
55 self.assertEqual(jobs[job.id],job) |
58 self.assertEqual(exJobs[job.dbjob],job) |
56 |
59 |
57 def testGet(self): |
60 def testGet(self): |
58 with self.session() as session: |
61 with self.session() as session: |
59 u = User(name='test',apikey='abcdef123456789') |
62 u = User(name='test',apikey='abcdef123456789') |
60 session.add(u) |
63 session.add(u) |
61 |
64 |
62 job = Job(u, [Telnumber('123456789')], SMS('test'), ['test']) |
65 job = ExJob(None, [Telnumber('123456789')], SMS('test'), ['test']) |
63 jobs['a1'] = job |
66 exJobs[1] = job |
64 |
67 |
65 self.assertEqual(len(jobs), 1) |
68 self.assertEqual(len(exJobs), 1) |
66 self.assertEqual(job, jobs['a1']) |
69 self.assertEqual(job, exJobs[1]) |
67 |
70 |
68 def testGetFromDB(self): |
71 def testGetFromDB(self): |
69 with self.session() as session: |
72 with self.session() as session: |
70 u = User(name='test',apikey='abcdef123456789') |
73 u = User(name='test',apikey='abcdef123456789') |
71 job = DBJob(hash='a1', info="info", status="started") |
74 job = Job( info="info", status="started") |
72 u.jobs.append(job) |
75 u.jobs.append(job) |
73 session.add(u) |
76 session.add(u) |
74 |
77 |
75 with self.session() as session: |
78 with self.session() as session: |
76 job = session.merge(job) |
79 job = session.merge(job) |
77 u = session.merge(u) |
80 u = session.merge(u) |
78 self.assertEqual(repr(jobs['a1'].dbjob),repr(job)) |
81 ejob= ExJob(job.id, [Telnumber('123456789')], SMS('test'), ['test']) |
79 self.assertEqual(repr(jobs['a1'].user),repr(u)) |
82 exJobs[job.id]=ejob |
80 self.assertEqual(jobs['a1'].info, 'info') |
83 self.assertEqual(job.extend, ejob) |
|
84 self.assertEqual(u.jobs[0].extend,ejob) |
81 |
85 |
82 def testUnknownJob(self): |
86 def testUnknownExJob(self): |
83 with self.assertRaises(JobNotFound): |
87 self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890') |
84 Job.fromDB('a1234567890') |
|
85 |
|
86 with self.assertRaises(KeyError): |
|
87 jobs['a1234567890'] |
|
88 |
|
89 @unittest.skip('test not implemented') |
|
90 def testSyncroniced(self): |
|
91 pass |
|
92 |
88 |
93 class JobTest(DBTestCase): |
89 class JobTest(DBTestCase): |
|
90 |
|
91 def setUp(self): |
|
92 DBTestCase.setUp(self) |
|
93 self.pool = data.pool |
|
94 data.pool = DumpPool() |
|
95 |
|
96 def tearDown(self): |
|
97 exJobs.clear() |
|
98 data.pool = self.pool |
|
99 self.pool = None |
|
100 taskPool.pool.q.queue = deque() |
|
101 DBTestCase.tearDown(self) |
|
102 |
94 def testCreateSMS(self): |
103 def testCreateSMS(self): |
95 job = createJob([],SMS('sms'),[]) |
104 with self.session() as session: |
96 pass |
105 u = User(name='test',apikey='abcdef123456789') |
|
106 session.add(u) |
|
107 exjob = createJob(u,[Telnumber('0123325456')],SMS('sms'),[]) |
|
108 self.assertEqual(taskPool.pool.q.qsize(),1) |
|
109 self.assertEqual(exjob.tasks.keys(),[Telnumber('0123325456')]) |
|
110 self.assertIsInstance(exjob.tasks[Telnumber('0123325456')], Task) |
|
111 |
|
112 |
97 |
113 |
98 def testCreateFax(self): |
114 def testCreateFax(self): |
99 job = createJob([],Fax('header','fax',[]),[]) |
115 with self.session() as session: |
100 pass |
116 u = User(name='test',apikey='abcdef123456789') |
|
117 session.add(u) |
|
118 job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[]) |
|
119 |
|
120 self.assertEqual(taskPool.pool.q.qsize(),1) |
101 |
121 |
102 def testCreateMail(self): |
122 def testCreateMail(self): |
103 job = createJob([],Mail('sub','body','t@t.de'),[]) |
123 with self.session() as session: |
|
124 u = User(name='test',apikey='abcdef123456789') |
|
125 session.add(u) |
|
126 job = createJob(u,[],Mail('sub','body','t@t.de'),[]) |
104 |
127 |
105 |
128 |
106 if __name__ == '__main__': |
129 if __name__ == '__main__': |
107 unittest.main() |
130 unittest.main() |