|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 import unittest |
|
4 import server |
|
5 import threading, xmlrpclib |
|
6 from multiprocessing import Queue |
|
7 from multiprocessing.managers import BaseManager |
|
8 |
|
9 class StoppableXMLRPCServer(server.SecureUserDBXMLRPCServer, threading.Thread): |
|
10 running=True |
|
11 def __init__(self, *args, **kwargs): |
|
12 server.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs) |
|
13 threading.Thread.__init__(self) |
|
14 |
|
15 |
|
16 def run(self): |
|
17 # *serve_forever* muss in einem eigenen Thread laufen, damit man es |
|
18 # unterbrechen kann! |
|
19 while (self.running): |
|
20 self.handle_request() |
|
21 |
|
22 |
|
23 def stop(self): |
|
24 self.running=False |
|
25 self.server_close() |
|
26 |
|
27 |
|
28 def init_server(): |
|
29 userlist=[{"name":"test","password":"test", "class":server.User}, |
|
30 {"name":"test2","password":"test2", "class": server.User}, |
|
31 {"name":"admin","password":"admin", "class": server.Admin}] |
|
32 |
|
33 |
|
34 |
|
35 class MyManager(BaseManager): |
|
36 pass |
|
37 |
|
38 MyManager.register('MessageJob', server.MessageJob) |
|
39 manager = MyManager() |
|
40 manager.start() |
|
41 |
|
42 |
|
43 #anbieter erzeugen und konfigurieren |
|
44 import anbieter |
|
45 sip=anbieter.sipgate() |
|
46 sip.read_basic_config("iro.conf") |
|
47 localhost="" |
|
48 |
|
49 #Benutzerdatenbank erstellen |
|
50 queue = Queue() |
|
51 provider={"sms":sip, "fax":sip, "mail":localhost} |
|
52 jobqueue=server.Jobs(manager, queue, provider) |
|
53 userdb=server.MyUserDB(userlist,jobqueue) |
|
54 |
|
55 |
|
56 #Server starten |
|
57 serv = StoppableXMLRPCServer(addr=("localhost", 8000), |
|
58 userdb=userdb, |
|
59 certificate="./certs/test.cert.pem",privatekey="./certs/test.key.pem") |
|
60 serv.relam="xmlrpc" |
|
61 return serv |
|
62 |
|
63 |
|
64 class TestServer(unittest.TestCase): |
|
65 |
|
66 def setUp(self): |
|
67 self.serv = init_server() |
|
68 self.serv.start() |
|
69 |
|
70 def tearDown(self): |
|
71 self.serv.stop() |
|
72 xmlrpclib.Server("https://test:test@localhost:8000").status() #letzte nachricht abrufen, damit richt geschlossen wird |
|
73 |
|
74 def testLogin(self): |
|
75 self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {}) |
|
76 self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {}) |
|
77 self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status) |
|
78 self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status) |
|
79 |
|
80 def testsendSMS(self): |
|
81 servstr="https://test:test@localhost:8000" |
|
82 client=xmlrpclib.Server(servstr) |
|
83 id=client.startSMS("test",["01234"] ) |
|
84 self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': {'content': 'test'}}} ) |
|
85 |
|
86 def testTwoUser(self): |
|
87 u1="https://test:test@localhost:8000" |
|
88 u2="https://test2:test2@localhost:8000" |
|
89 admin="https://admin:admin@localhost:8000" |
|
90 client1=xmlrpclib.Server(u1) |
|
91 client2=xmlrpclib.Server(u2) |
|
92 admin=xmlrpclib.Server(admin) |
|
93 id1=client1.startSMS("test",["01234"] ) |
|
94 self.assertEqual(client2.status(),{} ) |
|
95 self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}} ) |
|
96 id2=client2.startSMS("test2",["01234"] ) |
|
97 self.assertNotEqual(id1, id2) |
|
98 self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}}) |
|
99 self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': {'content': 'test2'}}}) |
|
100 self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}, |
|
101 id2: {'status': ['init',{}], 'name': {'content': 'test2'}}} ) |
|
102 |
|
103 self.assertRaises(xmlrpclib.Fault, client2.status, id1) |
|
104 self.assertRaises(xmlrpclib.Fault, client1.status, id2) |
|
105 |
|
106 if __name__ == "__main__": |
|
107 unittest.main() |