# -*- coding: utf-8 -*-
import unittest
import server
import threading, xmlrpclib
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
class StoppableXMLRPCServer(server.SecureUserDBXMLRPCServer, threading.Thread):
running=True
def __init__(self, *args, **kwargs):
server.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
threading.Thread.__init__(self)
def run(self):
# *serve_forever* muss in einem eigenen Thread laufen, damit man es
# unterbrechen kann!
while (self.running):
self.handle_request()
def stop(self):
self.running=False
self.server_close()
def init_server():
userlist=[{"name":"test","password":"test", "class":server.User},
{"name":"test2","password":"test2", "class": server.User},
{"name":"admin","password":"admin", "class": server.Admin}]
class MyManager(BaseManager):
pass
MyManager.register('MessageJob', server.MessageJob)
manager = MyManager()
manager.start()
#anbieter erzeugen und konfigurieren
import anbieter
sip=anbieter.sipgate()
sip.read_basic_config("iro.conf")
localhost=""
#Benutzerdatenbank erstellen
queue = Queue()
provider={"sms":sip, "fax":sip, "mail":localhost}
jobqueue=server.Jobs(manager, queue, provider)
userdb=server.MyUserDB(userlist,jobqueue)
#Server starten
serv = StoppableXMLRPCServer(addr=("localhost", 8000),
userdb=userdb,
certificate="./certs/test.cert.pem",privatekey="./certs/test.key.pem")
serv.relam="xmlrpc"
return serv
class TestServer(unittest.TestCase):
def setUp(self):
self.serv = init_server()
self.serv.start()
def tearDown(self):
self.serv.stop()
xmlrpclib.Server("https://test:test@localhost:8000").status() #letzte nachricht abrufen, damit richt geschlossen wird
def testLogin(self):
self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
def testsendSMS(self):
servstr="https://test:test@localhost:8000"
client=xmlrpclib.Server(servstr)
id=client.startSMS("test",["01234"] )
self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': {'content': 'test'}}} )
def testTwoUser(self):
u1="https://test:test@localhost:8000"
u2="https://test2:test2@localhost:8000"
admin="https://admin:admin@localhost:8000"
client1=xmlrpclib.Server(u1)
client2=xmlrpclib.Server(u2)
admin=xmlrpclib.Server(admin)
id1=client1.startSMS("test",["01234"] )
self.assertEqual(client2.status(),{} )
self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}} )
id2=client2.startSMS("test2",["01234"] )
self.assertNotEqual(id1, id2)
self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}})
self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': {'content': 'test2'}}})
self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}},
id2: {'status': ['init',{}], 'name': {'content': 'test2'}}} )
self.assertRaises(xmlrpclib.Fault, client2.status, id1)
self.assertRaises(xmlrpclib.Fault, client1.status, id2)
if __name__ == "__main__":
unittest.main()