# -*- coding: utf-8 -*-
import unittest, ConfigParser
import iro
import threading, xmlrpclib
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
class StoppableXMLRPCServer(iro.SecureUserDBXMLRPCServer, threading.Thread):
running=True
def __init__(self, *args, **kwargs):
iro.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
threading.Thread.__init__(self)
def run(self):
# *serve_forever* muss in einem eigenen Thread laufen, damit man es
# unterbrechen kann!
while (self.running):
self.handle_request()
def stop(self):
self.running=False
self.server_close()
def init_server():
userlist=[{"name":"test","password":"test", "class":iro.User},
{"name":"test2","password":"test2", "class": iro.User},
{"name":"admin","password":"admin", "class": iro.Admin}]
class MyManager(BaseManager):
pass
MyManager.register('MessageJob', iro.MessageJob)
manager = MyManager()
manager.start()
#anbieter erzeugen und konfigurieren
import anbieter
sip=anbieter.sipgate()
sip.read_basic_config("iro.conf")
localhost=iro.MySMTP()
localhost.read_basic_config("iro.conf")
smstrade=iro.MySmstrade()
smstrade.read_basic_config("iro.conf")
#Benutzerdatenbank erstellen
queue = Queue()
provider={"sms":sip, "fax":sip, "mail":localhost}
jobqueue=iro.Jobs(manager, queue, provider)
userdb=iro.MyUserDB(userlist,jobqueue)
#Server starten
cp = ConfigParser.ConfigParser()
cp.read(["iro.conf"])
cert=cp.get('server', 'cert')
key=cp.get('server', 'key')
serv = StoppableXMLRPCServer(addr=("localhost", 8000),
userdb=userdb,
certificate=cert,privatekey=key)
serv.relam="xmlrpc"
return serv
class TestServer(unittest.TestCase):
def setUp(self):
self.serv = init_server()
self.serv.start()
def tearDown(self):
self.serv.stop()
xmlrpclib.Server("https://test:test@localhost:8000").status() #letzte nachricht abrufen, damit richt geschlossen wird
def testLogin(self):
self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
def testsendSMS(self):
servstr="https://test:test@localhost:8000"
client=xmlrpclib.Server(servstr)
id=client.startSMS("test",["01234"] )
self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
def testTwoUser(self):
u1="https://test:test@localhost:8000"
u2="https://test2:test2@localhost:8000"
admin="https://admin:admin@localhost:8000"
client1=xmlrpclib.Server(u1)
client2=xmlrpclib.Server(u2)
admin=xmlrpclib.Server(admin)
id1=client1.startSMS("test",["01234"] )
self.assertEqual(client2.status(),{} )
self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': 'test'}} )
id2=client2.startSMS("test2",["01234"] )
self.assertNotEqual(id1, id2)
self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': 'test'}})
self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': 'test2'}})
self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': 'test'},
id2: {'status': ['init',{}], 'name': 'test2'}} )
self.assertRaises(xmlrpclib.Fault, client2.status, id1)
self.assertRaises(xmlrpclib.Fault, client1.status, id2)
def testGetProvider(self):
servstr="https://test:test@localhost:8000"
client=xmlrpclib.Server(servstr)
self.assertEqual(client.getProvider("sms").sort(), ["fax.de","geonet", "sipgate", "smstrade"])
self.assertEqual(client.getProvider("fax").sort(), ["fax.de","geonet", "sipgate"])
self.assertEqual(client.getProvider("mail").sort(), ["localhost"])
if __name__ == "__main__":
unittest.main()