|
1 from multiprocessing import Process |
|
2 import unittest |
|
3 |
|
4 from datetime import datetime |
|
5 |
|
6 import time |
|
7 |
|
8 from xmlrpclib import Server as xServer, ServerProxy, Fault |
|
9 |
|
10 from iro.model.schema import User, Offer, Userright, Job, Message |
|
11 |
|
12 from iro.main import runReactor |
|
13 |
|
14 import iro.error as IroError |
|
15 |
|
16 from ..test_helpers.dbtestcase import DBTestCase |
|
17 |
|
18 |
|
19 class XMLRPCTest(DBTestCase): |
|
20 """tests for the xmlrpc interface""" |
|
21 def setUp(self): |
|
22 DBTestCase.setUp(self) |
|
23 self.s = Process(target=startReactor, args=(self.engine,)) |
|
24 self.s.start() |
|
25 #the new process needs time to get stated, so this process has to sleep |
|
26 time.sleep(.2) |
|
27 |
|
28 def tearDown(self): |
|
29 self.__debug().stop() |
|
30 time.sleep(.2) |
|
31 self.s.join() |
|
32 DBTestCase.tearDown(self) |
|
33 |
|
34 def __debug(self): |
|
35 return xServer('http://localhost:7080/debug') |
|
36 |
|
37 def __rpc2(self): |
|
38 return ServerProxy('http://localhost:7080/RPC2') |
|
39 |
|
40 def testDebugHello(self): |
|
41 '''simple test for the connection to xmlrpc server''' |
|
42 ret=self.__debug().hello() |
|
43 self.failUnlessEqual(ret,'hello') |
|
44 |
|
45 def testListMethods(self): |
|
46 '''list of all offical Methods, that can be executed''' |
|
47 ret=self.__rpc2().listMethods() |
|
48 self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'bill', 'telnumber','email']) |
|
49 |
|
50 def testStatus(self): |
|
51 ''' test the status function''' |
|
52 with self.session() as session: |
|
53 u = User(name='test',apikey='abcdef123456789') |
|
54 session.add(User(name='test',apikey='abcdef123456789')) |
|
55 self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), {}) |
|
56 |
|
57 with self.session() as session: |
|
58 u = session.merge(u) |
|
59 j = Job(info='info', status="started") |
|
60 j.user=u |
|
61 session.add(j) |
|
62 session.commit() |
|
63 jid=j.id |
|
64 status = {str(jid):{"status":"started"}} |
|
65 self.failUnlessEqual(self.__rpc2().status('abcdef123456789',jid), status) |
|
66 self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), status) |
|
67 self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'false'), status) |
|
68 self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 0), status) |
|
69 |
|
70 #JobNotFound |
|
71 exc = self.assertRaises(Fault, self.__rpc2().status, 'abcdef123456789',jid+1) |
|
72 unf = IroError.JobNotFound() |
|
73 self.failUnlessEqual(exc.faultCode, unf.code) |
|
74 self.failUnlessEqual(exc.faultString, unf.msg) |
|
75 |
|
76 #self.failUnlessEqual(self.__rpc2().status('abcdef123456789','abcde', True), ["<User('test','abcdef123456789')>",'abcde', True]) |
|
77 #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'true'), ["<User('test','abcdef123456789')>", '', True]) |
|
78 #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 1), ["<User('test','abcdef123456789')>", '', True]) |
|
79 |
|
80 def testNoSuchUser(self): |
|
81 '''a unknown user should raise a UserNotNound Exception |
|
82 bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception''' |
|
83 exc = self.assertRaises(Fault, self.__rpc2().status, 'abcdef123456789') |
|
84 unf=IroError.UserNotFound() |
|
85 self.failUnlessEqual(exc.faultCode, unf.code) |
|
86 self.failUnlessEqual(exc.faultString, unf.msg) |
|
87 |
|
88 def testNoSuchMethod(self): |
|
89 '''a unknown mothod should raise a Exception ''' |
|
90 exc = self.assertRaises(Fault, self.__rpc2().nosuchmethod) |
|
91 self.failUnlessEqual(exc.faultCode, 8001) |
|
92 self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found") |
|
93 |
|
94 def testValidationFault(self): |
|
95 '''a validate Exception should be translated to a xmlrpclib.Fault.''' |
|
96 exc = self.assertRaises(Fault, self.__rpc2().status,'xxx') |
|
97 self.failUnlessEqual(exc.faultCode, 700) |
|
98 self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.") |
|
99 |
|
100 def testRoutes(self): |
|
101 '''test the route function''' |
|
102 with self.session() as session: |
|
103 u=User(name='test',apikey='abcdef123456789') |
|
104 o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") |
|
105 u.rights.append(Userright(o)) |
|
106 session.add(u) |
|
107 self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic']) |
|
108 |
|
109 exc = self.assertRaises(Fault, self.__rpc2().routes,'abcdef123456789','fax') |
|
110 self.failUnlessEqual(exc.faultCode, 700) |
|
111 self.failUnlessEqual(exc.faultString, "Typ is not valid.") |
|
112 |
|
113 with self.session() as session: |
|
114 o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") |
|
115 u = session.query(User).filter_by(name="test").first() |
|
116 u.rights.append(Userright(o)) |
|
117 o=Offer(name="faxde", provider="faxde", route="", typ="fax") |
|
118 session.add(o) |
|
119 session.commit() |
|
120 self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus']) |
|
121 self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),[]) |
|
122 |
|
123 with self.session() as session: |
|
124 u = session.query(User).filter_by(name="test").first() |
|
125 u.rights.append(Userright(o)) |
|
126 |
|
127 self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus']) |
|
128 self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),['faxde']) |
|
129 |
|
130 def testDefaultRoutes(self): |
|
131 '''test the defaultRoute function''' |
|
132 with self.session() as session: |
|
133 u=User(name='test',apikey='abcdef123456789') |
|
134 o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") |
|
135 u.rights.append(Userright(o,True)) |
|
136 o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") |
|
137 u.rights.append(Userright(o)) |
|
138 session.add(u) |
|
139 self.failUnlessEqual(self.__rpc2().defaultRoute('abcdef123456789','sms'),['sipgate_basic']) |
|
140 |
|
141 def testTelnumbers(self): |
|
142 '''test the telefon validator''' |
|
143 self.failUnlessEqual(self.__rpc2().telnumber(["0123/456(78)","+4912346785433","00123435456-658"]),True) |
|
144 |
|
145 invalid=['xa','+1','1-23',';:+0','0123'] |
|
146 |
|
147 exc = self.assertRaises(Fault, self.__rpc2().telnumber,['01234']+invalid) |
|
148 self.failUnlessEqual(exc.faultCode, 701) |
|
149 self.failUnlessEqual(exc.faultString, "No valid telnumber: '%s'" % invalid[0]) |
|
150 |
|
151 |
|
152 def testVaildEmail(self): |
|
153 '''test vaild email adresses (got from wikipedia)''' |
|
154 validmails=["niceandsimple@example.com"] |
|
155 self.failUnlessEqual(self.__rpc2().email(validmails),True) |
|
156 |
|
157 def testInvaildEmail(self): |
|
158 '''test invaild email adresses (got from wikipedia)''' |
|
159 invalid=["Abc.example.com",] |
|
160 exc= self.assertRaises(Fault, self.__rpc2().email, invalid) |
|
161 self.failUnlessEqual(exc.faultCode, 702) |
|
162 self.failUnlessEqual(exc.faultString, "No valid email: '%s'" % invalid[0]) |
|
163 |
|
164 def testBill(self): |
|
165 '''test bill function''' |
|
166 apikey='abcdef123456789' |
|
167 with self.session() as session: |
|
168 u=User(name='test',apikey=apikey) |
|
169 session.add(u) |
|
170 |
|
171 self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.0,'anz':0}}) |
|
172 |
|
173 with self.session() as session: |
|
174 u = session.merge(u) |
|
175 o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") |
|
176 u.rights.append(Userright(o)) |
|
177 j = Job(info='i',status='sended') |
|
178 j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) |
|
179 u.jobs.append(j) |
|
180 |
|
181 j = Job(info='a',status='sended') |
|
182 j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) |
|
183 u.jobs.append(j) |
|
184 |
|
185 ret=self.__rpc2().bill(apikey) |
|
186 self.failUnlessEqual(ret['total'],{'price':0.8,'anz':2}) |
|
187 self.failUnlessEqual(ret['sipgate_basic'], |
|
188 {'price':0.8,'anz':2, |
|
189 'info':{'i':{'price':0.4,'anz':1}, |
|
190 'a':{'price':0.4,'anz':1}, |
|
191 } |
|
192 }) |
|
193 |
|
194 |
|
195 def startReactor(engine): |
|
196 """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """ |
|
197 from twisted.internet import reactor |
|
198 from twisted.web import xmlrpc, resource |
|
199 |
|
200 from iro.view.xmlrpc import appendResource |
|
201 |
|
202 class XMLRPCDebug(xmlrpc.XMLRPC): |
|
203 def xmlrpc_stop(self): |
|
204 reactor.callLater(0.1,reactor.stop) |
|
205 return "" |
|
206 |
|
207 def xmlrpc_hello(self): |
|
208 return "hello" |
|
209 |
|
210 root = resource.Resource() |
|
211 root = appendResource(root) |
|
212 root.putChild('debug', XMLRPCDebug()) |
|
213 runReactor(reactor, engine, root) |
|
214 |
|
215 if __name__ == '__main__': |
|
216 unittest.main() |