|
135
|
1 |
import unittest |
|
|
2 |
|
|
|
3 |
from iro.controller.task import createJob |
|
|
4 |
|
|
|
5 |
from iro.model.job import jobs, Job |
|
|
6 |
|
|
|
7 |
from iro.model.pool import data |
|
|
8 |
|
|
|
9 |
from iro.model.message import SMS, Fax, Mail |
|
|
10 |
from iro.model.schema import Job as DBJob, User |
|
|
11 |
from iro.telnumber import Telnumber |
|
|
12 |
|
|
|
13 |
from iro.error import JobNotFound |
|
|
14 |
from iro.validate import vHash |
|
|
15 |
|
|
|
16 |
|
|
|
17 |
from .dbtestcase import DBTestCase, setUpModule, tearDownModule |
|
|
18 |
|
|
|
19 |
#activates all logging we can get. |
|
|
20 |
|
|
|
21 |
from twisted.python import log |
|
|
22 |
import logging |
|
|
23 |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s') |
|
|
24 |
observer = log.PythonLoggingObserver() |
|
|
25 |
observer.start() |
|
|
26 |
|
|
|
27 |
class DumpPool(): |
|
|
28 |
def run(self, f,*a,**k): |
|
|
29 |
return f(*a,**k) |
|
|
30 |
|
|
|
31 |
|
|
|
32 |
data.pool=DumpPool() |
|
|
33 |
|
|
|
34 |
|
|
|
35 |
class JobsTest(DBTestCase): |
|
|
36 |
'''tests for jobs''' |
|
|
37 |
|
|
|
38 |
def tearDown(self): |
|
|
39 |
jobs.clear() |
|
|
40 |
DBTestCase.tearDown(self) |
|
|
41 |
|
|
|
42 |
def testCreate(self): |
|
|
43 |
with self.session() as session: |
|
|
44 |
u = User(name='test',apikey='abcdef123456789') |
|
|
45 |
session.add(u) |
|
|
46 |
|
|
|
47 |
job = jobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) |
|
|
48 |
self.assertIsInstance(job, Job) |
|
|
49 |
self.assertTrue(vHash(job.id, None, 40, 40)) |
|
|
50 |
|
|
|
51 |
with self.session() as session: |
|
|
52 |
j = session.query(DBJob.hash).all() |
|
|
53 |
self.assertEqual(j,[(job.id,)]) |
|
|
54 |
|
|
|
55 |
self.assertEqual(jobs[job.id],job) |
|
|
56 |
|
|
|
57 |
def testGet(self): |
|
|
58 |
with self.session() as session: |
|
|
59 |
u = User(name='test',apikey='abcdef123456789') |
|
|
60 |
session.add(u) |
|
|
61 |
|
|
|
62 |
job = Job(u, [Telnumber('123456789')], SMS('test'), ['test']) |
|
|
63 |
jobs['a1'] = job |
|
|
64 |
|
|
|
65 |
self.assertEqual(len(jobs), 1) |
|
|
66 |
self.assertEqual(job, jobs['a1']) |
|
|
67 |
|
|
|
68 |
def testGetFromDB(self): |
|
|
69 |
with self.session() as session: |
|
|
70 |
u = User(name='test',apikey='abcdef123456789') |
|
|
71 |
job = DBJob(hash='a1', info="info", status="started") |
|
|
72 |
u.jobs.append(job) |
|
|
73 |
session.add(u) |
|
|
74 |
|
|
|
75 |
with self.session() as session: |
|
|
76 |
job = session.merge(job) |
|
|
77 |
u = session.merge(u) |
|
|
78 |
self.assertEqual(repr(jobs['a1'].dbjob),repr(job)) |
|
|
79 |
self.assertEqual(repr(jobs['a1'].user),repr(u)) |
|
|
80 |
self.assertEqual(jobs['a1'].info, 'info') |
|
|
81 |
|
|
|
82 |
def testUnknownJob(self): |
|
|
83 |
with self.assertRaises(JobNotFound): |
|
|
84 |
Job.fromDB('a1234567890') |
|
|
85 |
|
|
|
86 |
with self.assertRaises(KeyError): |
|
|
87 |
jobs['a1234567890'] |
|
|
88 |
|
|
|
89 |
@unittest.skip('test not implemented') |
|
|
90 |
def testSyncroniced(self): |
|
|
91 |
pass |
|
|
92 |
|
|
|
93 |
class JobTest(DBTestCase): |
|
|
94 |
def testCreateSMS(self): |
|
|
95 |
job = createJob([],SMS('sms'),[]) |
|
|
96 |
pass |
|
|
97 |
|
|
|
98 |
def testCreateFax(self): |
|
|
99 |
job = createJob([],Fax('header','fax',[]),[]) |
|
|
100 |
pass |
|
|
101 |
|
|
|
102 |
def testCreateMail(self): |
|
|
103 |
job = createJob([],Mail('sub','body','t@t.de'),[]) |
|
|
104 |
|
|
|
105 |
|
|
|
106 |
if __name__ == '__main__': |
|
|
107 |
unittest.main() |