# HG changeset patch # User Sandro Knauß # Date 1324227825 -3600 # Node ID fea4c6760ca56f8402ae2b7c6b74aa5fad592636 # Parent 265124610789e50fc68940e1bcfd76686a367644 ein wenig aufräumen diff -r 265124610789 -r fea4c6760ca5 iro/dump_test_log.py --- a/iro/dump_test_log.py Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,42 +0,0 @@ -import time, os, signal -LOG_FILE = 'test.log' - -log_file = open(LOG_FILE, 'a') - -def log(msg): - log_file.write(msg + '\n') - log_file.flush() - -def SigUSR1Handler(signum, frame): - print "Reacting on USR1 signal (signal 10)" - global log_file - log_file.close() - log_file = open(LOG_FILE, 'a') - return - -def init(): - if os.path.isfile('/var/usr/dump_test_log.pid'): - print 'Have to stop server first' - os.exit(1) - else: - print 'Starting server...' - #write process id file - f = open('/var/run/dump_test_log.pid', 'w') - f.write(str(os.getpid())) - f.flush() - f.close() - print 'Process start with pid ', os.getpid() - - signal.signal(signal.SIGUSR1, SigUSR1Handler) - -def main(): - init() - count = 1 - while True: - log('log line #%d, pid: %d' % (count, os.getpid())) - count = count + 1 - time.sleep(1) - -if __name__ == '__main__': - main() - diff -r 265124610789 -r fea4c6760ca5 iro/merlin --- a/iro/merlin Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ -#!/bin/bash -# -# merlin commando -# -# eine überprüfung auf korrekten aufruf findet nicht statt -# -# beispiel: -# -# ./merlin ./arthur -# -# startet programm arthur und wenn er stirbt, wird er sofort -# wiederbelebt. -# harmlose magie halt... :-) -# -LOG=/home/sandy/var/log/merlin_Iro.log -while : ; do - echo -n "$(date +'%F %T %Z') " >> $LOG - $1 status >> $LOG - if [ $? -eq 1 ]; then - echo $(date +'%F %T %Z') $1 neustarten >> $LOG - $1 start >> $LOG - fi - sleep 60 -done - diff -r 265124610789 -r fea4c6760ca5 iro/merlin_daemon --- a/iro/merlin_daemon Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,44 +0,0 @@ -#! /bin/sh -NAME="merlin" -DEAMON=/home/sandy/svn/iro/$NAME -DEAMON_OPTS="/home/sandy/svn/iro/MyIro_daemon" -PID=/home/sandy/var/run/$NAME.pid - -test -x $DEAMON || exit 0 - -. /lib/lsb/init-functions - -case "$1" in - start) - log_daemon_msg "Starting $NAME server" $NAME - if start-stop-daemon --start --quiet --oknodo --pidfile $PID --make-pidfile --background --startas $DEAMON -- $DEAMON_OPTS; then - log_end_msg 0 - else - log_end_msg 1 - fi - ;; - stop) - log_daemon_msg "Stopping $NAME server" $NAME - if start-stop-daemon --stop --quiet --oknodo --pidfile $PID; then - log_end_msg 0 - else - log_end_msg 1 - fi - ;; - - restart) - $0 stop - sleep 1 - $0 start - ;; - - status) - status_of_proc -p $PID $DEAMON $NAME && exit 0 || exit $? - ;; - - *) - log_action_msg "Usage: $0 {start|stop|restart|status}" - exit 1 -esac - -exit 0 diff -r 265124610789 -r fea4c6760ca5 iro/newinterface.py --- a/iro/newinterface.py Sun Dec 18 16:43:22 2011 +0100 +++ b/iro/newinterface.py Sun Dec 18 18:03:45 2011 +0100 @@ -14,12 +14,73 @@ import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s') + +class User(object): + def __init__(self,name,userhash): + self.name=name + self.userhash=userhash + + def __repr__(self): + return"User<'%s','%s'>"%(self.name,self.userhash) + +users={"1":User("spam","1"), + "2":User("foo","2") +} + +def getuser(userhash): + try: + return users[userhash] + except KeyError: + raise UserNotFound() + +def with_user(f): + def new_f(*args,**kargs): + args=list(args) + try: + logging.debug("Entering %s"%f.__name__) + try: + kargs["user"]=getuser(kargs["apikey"]) + del kargs["apikey"] + except KeyError: + kargs["user"]=getuser(args[1]) + del args[1] + ret=f(*args,**kargs) + logging.debug("Exited %s"%f.__name__) + return ret + except InterfaceException, e: + return e.dict() + new_f.__name__ = f.__name__ + return new_f + + +class InterfaceException(Exception): + def __init__(self, code=999, msg="Unbekannter Fehler."): + self.code=code + self.msg=msg + + def dict(self): + return {"code":self.code, + "msg":self.msg, + } + def __str__(self): + return "%i:%s"%(self.code,self.msg) + +class UserNotFound(InterfaceException): + def __init__(self): + InterfaceException.__init__(self, 901, "Der API-Key ist ungültig.") + +class ExternalException(InterfaceException): + def __init__(self): + InterfaceException.__init__(self, 950, "Fehler in externer API.") + + class Interface(object): '''class for a xmlrpc user ''' - def status(self, apikey, id=None, detailed=False): - '''Gibt den aktuellen Status eines Auftrages zurück. + @with_user + def status(self, user, id=None, detailed=False): + '''Gibt den aktuellen Status eines Auftrages oder Mehreren zurück. Keywords: apikey[string]: Der API Key @@ -33,9 +94,11 @@ ''' + #return user.status(id,detailed) return "" - def stop(self, apikey,id): + @with_user + def stop(self, user, id): '''Stoppt den angegeben Auftrag. Keywords: @@ -47,7 +110,8 @@ ''' return "" - def sms(self, apikey, message, recipients, route="default"): + @with_user + def sms(self, user, message, recipients, route="default"): '''Versendet eine SMS. Keywords: @@ -63,8 +127,8 @@ ''' return "" - - def fax(self, apikey, subject, fax, recipients, route="default"): + @with_user + def fax(self, user, subject, fax, recipients, route="default"): '''Versendet ein FAX. Keywords: @@ -81,7 +145,8 @@ ''' return "" - def mail(self, apikey, subject, body, recipients, frm, route="default"): + @with_user + def mail(self, user, subject, body, recipients, frm, route="default"): '''Versendet eine Email. Keywords: @@ -99,7 +164,8 @@ ''' return "" - def routes(self, apikey, typ): + @with_user + def routes(self, user, typ): '''Gibt eine Liste aller verfügbaren Provider zurück. Keywords: @@ -113,7 +179,8 @@ ''' return "" - def defaultRoute(self, apikey, typ): + @with_user + def defaultRoute(self, user, typ): '''Gibt den Standardprovider zurück. Keywords: @@ -128,7 +195,8 @@ ''' return "" - def statistic(self,apikey): + @with_user + def statistic(self, user): '''Gibt eine Statik zurück über die versendendeten Nachrichten und des Preises. Keywords: @@ -157,7 +225,6 @@ def __init__(self): xmlrpc.XMLRPC.__init__(self) Interface.__init__(self) - self.allowNone = True def lookupProcedure(self, procedurePath): logging.debug("lookupProcedure('%s')"%procedurePath) diff -r 265124610789 -r fea4c6760ca5 iro/tests/__init__.py diff -r 265124610789 -r fea4c6760ca5 iro/tests/stopableServer.py --- a/iro/tests/stopableServer.py Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,118 +0,0 @@ -import ConfigParser - -import threading - -from multiprocessing import Queue -from multiprocessing.managers import BaseManager - -from iro import xmlrpc,anbieter -from iro.user import User, Admin -from iro.iro import MySMTP,MySmstrade,MyUserDB -from iro.job import SMSJob, FAXJob, MailJob -from iro.joblist import Joblist -from iro.providerlist import Providerlist - -class StoppableXMLRPCServer(xmlrpc.SecureUserDBXMLRPCServer, threading.Thread): - running=False - def __init__(self, *args, **kwargs): - xmlrpc.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs) - threading.Thread.__init__(self) - self.timeout=.5 - - def start(self): - self.running=True - threading.Thread.start(self) - - - def run(self): - # *serve_forever* muss in einem eigenen Thread laufen, damit man es - # unterbrochen werden kann! - while (self.running): - try: - self.handle_request() - except : - break - - def stop(self): - if (self.running): - self.running=False - self.server_close() - self.join() - - def __del__(self): - self.stop() - - def __enter__(self): - self.start() - return self - - def __exit__(self,type,value,traceback): - self.stop() - - -class Internal: - pass - -def init_server(): - userlist=[{"name":"test","password":"test", "class":User}, - {"name":"test2","password":"test2", "class": User}, - {"name":"admin","password":"admin", "class": Admin}] - - - - class MyManager(BaseManager): - pass - - internal=Internal() - - MyManager.register('SMSJob', SMSJob) - MyManager.register('FaxJob', FAXJob) - MyManager.register('MailJob',MailJob) - MyManager.register('Providerlist',Providerlist) - manager = MyManager() - manager.start() - - internal.manager=manager - - #anbieter erzeugen und konfigurieren - sip=anbieter.sipgate() - sip.read_basic_config("iro.conf") - - localhost=MySMTP() - localhost.read_basic_config("iro.conf") - - smstrade=MySmstrade() - smstrade.read_basic_config("iro.conf") - - #Benutzerdatenbank erstellen - queue = Queue() - internal.queue=queue - provider=Providerlist() - internal.provider=provider - provider.add("sipgate", sip, ["sms", "fax", ]) - provider.add("smstrade", smstrade, ["sms", ]) - provider.add("geonet", None, ["sms", "fax", ]) - provider.add("fax.de", None, ["sms", "fax", ]) - provider.add("localhost", localhost, ["mail", ]) - provider.setDefault("sms","smstrade") - provider.setDefault("fax","sipgate") - provider.setDefault("mail","localhost") - jobqueue=Joblist(manager, queue, provider) - internal.jobqueue=jobqueue - userdb=MyUserDB(userlist,jobqueue) - internal.userdb=userdb - - - #Server starten - cp = ConfigParser.ConfigParser() - cp.read(["iro.conf"]) - cert=cp.get('server', 'cert') - key=cp.get('server', 'key') - serv = StoppableXMLRPCServer(addr=("localhost", 8000), - userdb=userdb, - certificate=cert,privatekey=key, - logRequests=False) - serv.relam="xmlrpc" - internal.serv=serv - return internal - diff -r 265124610789 -r fea4c6760ca5 iro/tests/test.pdf Binary file iro/tests/test.pdf has changed diff -r 265124610789 -r fea4c6760ca5 iro/tests/testJob.py --- a/iro/tests/testJob.py Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,82 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest - -import xmlrpclib -from stopableServer import init_server -from iro.anbieter.content import SMS,FAX,Mail - -class TestServer(unittest.TestCase): - - def setUp(self): - self.i = init_server() - self.serv=self.i.serv - self.serv.start() - - def tearDown(self): - self.serv.stop() - - - def SendSMS(self,msg): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - id=client.startSMS(msg,["01234", ] ) - self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': unicode(msg)}} ) - ele=self.i.queue.get(.1) - self.assertEqual(ele.getRecipients(),["01234", ] ) - self.assertNotEqual(ele.getMessage(),SMS('') ) - self.assertEqual(ele.getMessage(),SMS(msg) ) - - def testSimpleSMS(self): - self.SendSMS("test") - - def testSpecialCharacters(self): - self.SendSMS(u"!\"§$%&/()=?\'") - self.SendSMS(u"@ł€ł€¶ŧł¼¼½¬¬↓ŧ←ĸ↓→øđŋħ“”µ·…–|") - - def testSendFAX(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ" - id=client.startFAX("test",xmlrpclib.Binary(msg),["01234", ] ) - self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) - ele=self.i.queue.get(.1) - self.assertEqual(ele.getRecipients(),["01234", ] ) - self.assertEqual(ele.getMessage(),FAX('test','',[msg])) - - def testDoubleFAX(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ" - pdf=open('tests/test.pdf').read() - id=client.startFAX("test",[xmlrpclib.Binary(msg),xmlrpclib.Binary(pdf)],["01234", ] ) - self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) - ele=self.i.queue.get(.1) - self.assertEqual(ele.getRecipients(),["01234", ] ) - self.assertEqual(ele.getMessage(),FAX('test','',[msg, pdf])) - - def testSendMail(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - msg=u"2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ" - id=client.startMail("test",msg,["test@test.de", ],'absender@test.de' ) - self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) - ele=self.i.queue.get(.1) - self.assertEqual(ele.getRecipients(),["test@test.de", ] ) - self.assertEqual(ele.getMessage(),Mail('test',msg,'absender@test.de')) - self.assertEqual(ele.getMessage().as_string(),"""Content-Type: text/plain; charset="utf-8" -MIME-Version: 1.0 -Content-Transfer-Encoding: base64 -Subject: =?utf-8?q?test?= - -MjEzNHdlcmdzZGZnNHc1NnEzNDEzNMOmxb/DsMSRw6bDsMW/xJHFi8KzQMK8xafDpsOwxJHFi8WC -4oKswrbFp+KCrMK2xac= -""") - sub=u"³¼½ſðđŋſ€¼½ÖÄÜß" - id=client.startMail(sub,msg,["test@test.de", ],'absender@test.de' ) - self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': sub}}) - ele=self.i.queue.get(.1) - self.assertEqual(ele.getMessage(),Mail(sub, msg, 'absender@test.de')) - -if __name__ == "__main__": - unittest.main() diff -r 265124610789 -r fea4c6760ca5 iro/tests/testWorker.py --- a/iro/tests/testWorker.py Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,147 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest -import logging -from time import sleep - -from multiprocessing import Queue -from multiprocessing.managers import BaseManager, ListProxy - -from iro.worker import Worker -from iro.job import Job, SMSJob -from iro.anbieter.anbieter import anbieter -from iro.anbieter.content import SMS -from iro.providerlist import Providerlist - -from logging.handlers import BufferingHandler - -class MyHandler(BufferingHandler): - def __init__(self,buffer=None): - '''BufferingHandler takes a "capacity" argument - so as to know when to flush. As we're overriding - shouldFlush anyway, we can set a capacity of zero. - You can call flush() manually to clear out the - buffer. - buffer: log messages to this buffer, needed f.ex for processes or threads''' - BufferingHandler.__init__(self, 0) - self.buffer=buffer - - def shouldFlush(self): - return False - - def emit(self, record): - if record.exc_info: #sonst geht das append schief, weil nicht picklebar - record.exc_info=record.exc_info[:-1] - self.buffer.append(record.__dict__) - - -class BadJob(Job): - def start(self,id=None): - Job.start(self,id) - raise Exception("Error") - -#einen Manager anlegen, der Job und eine Liste anbietet -class MyManager(BaseManager): - pass -MyManager.register('Job', Job) -MyManager.register('SMSJob', SMSJob) -MyManager.register('BadJob', BadJob) -MyManager.register('list', list, ListProxy) -MyManager.register('Providerlist',Providerlist) - -class TestWorker(unittest.TestCase): - def setUp(self): - #erstelle eine Queue&Manager - self.manager = MyManager() - self.queue = Queue() - - #manager starten, damit wir Logging anpassen können - self.manager.start() - self.setUpLogging() - - self.providerlist=self.manager.Providerlist() - self.providerlist.add("test", anbieter() , ["sms", ]) - - #eigentlich Workerprocess starten - self.worker= Worker(self.queue) - self.worker.start() - - def tearDown(self): - #Thread&Queue stoppen - self.stop() - - #Logging änderungen rückgängig - self.tearDownLogging() - self.manager.shutdown() - - def stop(self): - self.queue.close() - self.queue.join_thread() - self.worker.terminate() - - - def setUpLogging(self): - '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' - #wir brauchen eine threadsichere liste - self.buffer=self.manager.list() - - #Handler erstellen, der in den Buffer schreibt - self.handler = h = MyHandler(self.buffer) - self.logger =l= logging.getLogger() - - #Level anpassen - l.setLevel(logging.DEBUG) - h.setLevel(logging.DEBUG) - l.addHandler(h) - - def tearDownLogging(self): - '''crazy logging hacks wieder entfernen''' - self.logger.removeHandler(self.handler) - self.logger.setLevel(logging.NOTSET) - self.handler.close() - - - def testJob(self): - '''einen Job verarbeiten''' - job=self.manager.Job(None,None,"test") - self.assertEqual(job.getStatus(),("init",{})) - self.queue.put(job) - sleep(.1) - self.stop() - self.assertEqual(job.getStatus(),("started",{})) - self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], - [(20,'Workerprocess läuft nun...'), - (20,'ein neuer Job(1)'), - (20,'Job(1) fertig ;)')]) - - def testBadJob(self): - '''einen Job verarbeiten, der fehlschlägt''' - job=self.manager.BadJob(None,None,"test") - self.assertEqual(job.getStatus(),("init",{})) - self.queue.put(job) - sleep(.1) - self.stop() - self.assertEqual(job.getStatus(),("error",{})) - self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], - [(20,'Workerprocess läuft nun...'), - (20,'ein neuer Job(1)'), - (40,'Job(1) fehlgeschlagen :(')]) - error=Exception('Error') - self.assertEqual(self.buffer[-1]['exc_info'][0],type(error)) - self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error)) - - def testSMSJob(self): - job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345]) - self.assertEqual(job.getStatus(),("init",{})) - self.queue.put(job) - sleep(.1) - self.stop() - self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []})) - self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], - [(20,'Workerprocess läuft nun...'), - (20,'ein neuer Job(1)'), - (20,'Job(1) fertig ;)')]) - - -if __name__ == "__main__": - unittest.main() diff -r 265124610789 -r fea4c6760ca5 iro/tests/testXMLRPCServer.py --- a/iro/tests/testXMLRPCServer.py Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest - -import xmlrpclib -from stopableServer import init_server - -class TestServer(unittest.TestCase): - - def setUp(self): - self.i = init_server() - self.serv=self.i.serv - - self.serv.start() - - def tearDown(self): - self.serv.stop() - - def testLogin(self): - self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {}) - self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {}) - self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status) - self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status) - - def testsendSMS(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - id=client.startSMS("test",["01234", ] ) - self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) - - def testTwoUser(self): - u1="https://test:test@localhost:8000" - u2="https://test2:test2@localhost:8000" - admin="https://admin:admin@localhost:8000" - client1=xmlrpclib.Server(u1) - client2=xmlrpclib.Server(u2) - admin=xmlrpclib.Server(admin) - id1=client1.startSMS("test",["01234"] ) - self.assertEqual(client2.status(),{} ) - self.assertEqual(admin.status(id1),{id1: {'status': ['init', {}], 'name': 'test'}} ) - id2=client2.startSMS("test2",["01234"] ) - self.assertNotEqual(id1, id2) - self.assertEqual(client1.status(),{id1: {'status': ['init', {}], 'name': 'test'}}) - self.assertEqual(client2.status(),{id2: {'status': ['init', {}], 'name': 'test2'}}) - self.assertEqual(admin.status(),{id1: {'status': ['init', {}], 'name': 'test'}, - id2: {'status': ['init', {}], 'name': 'test2'}} ) - - self.assertEqual(client2.status(id1), {}) - self.assertEqual(client1.status(id2), {}) - - def testGetProvider(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"]) - self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"]) - self.assertEqual(client.getProvider("mail"), ["localhost"]) - - self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp") - - def testGetDefault(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - self.assertEqual(client.getDefaultProvider("sms"), "smstrade") - self.assertEqual(client.getDefaultProvider("fax"),"sipgate") - self.assertEqual(client.getDefaultProvider("mail"), "localhost") - - self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp") - - -if __name__ == "__main__": - unittest.main() diff -r 265124610789 -r fea4c6760ca5 iro/tests/testloglock.py --- a/iro/tests/testloglock.py Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,98 +0,0 @@ - -import unittest -import os -import tempfile -import signal -import threading -import time - -class testLogLock(unittest.TestCase): - def test_ThreadingAndLocks(self): - #create a thread, have that thread grab a lock, and sleep. - fd, file = tempfile.mkstemp('.nonlog') - _lock = threading.RLock() - os.close(fd) - def locker(): - os.write(fd, 'Thread acquiring lock\n') - _lock.acquire() - os.write(fd, 'Thread acquired lock\n') - time.sleep(0.4) - os.write(fd, 'Thread releasing lock\n') - _lock.release() - os.write(fd, 'Thread released lock\n') - - #Then in the main thread, throw a signal to self - def handleSignal(sigNum, frame): - os.write(fd, 'Main Thread acquiring lock\n') - lock = False - endtime = time.time() + 1.0 - while not lock: - lock = _lock.acquire(blocking=0) - time.sleep(0.01) - if time.time() > endtime: - break - if not lock: - os.write(fd, 'Main Thread could not acquire lock\n') - return - os.write(fd, 'Main Thread acquired lock\n') - os.write(fd, 'Main Thread releasing lock\n') - _lock.release() - os.write(fd, 'Main Thread released lock\n') - - sighndlr = signal.signal(signal.SIGUSR1, handleSignal) - try: - fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT) - thread = threading.Thread(target=locker) - thread.start() - time.sleep(0.1) - os.kill(os.getpid(), signal.SIGUSR1) - thread.join() - - #check the results - os.close(fd) - fileconts = open(file, 'r').read() - self.assertTrue('Main Thread released lock' in fileconts) - self.assertEqual(fileconts, - '''Thread acquiring lock -Thread acquired lock -Main Thread acquiring lock -Thread releasing lock -Thread released lock -Main Thread acquired lock -Main Thread releasing lock -Main Thread released lock -''') - - #Now try after acquiring the lock from the main thread - fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT) - _lock.acquire() - thread = threading.Thread(target=locker) - thread.start() - time.sleep(0.1) - os.kill(os.getpid(), signal.SIGUSR1) - _lock.release() - thread.join() - os.close(fd) - fileconts = open(file, 'r').read() - self.assertEqual(fileconts, - '''Thread acquiring lock -Main Thread acquiring lock -Main Thread acquired lock -Main Thread releasing lock -Main Thread released lock -Thread acquired lock -Thread releasing lock -Thread released lock -''') - - finally: - signal.signal(signal.SIGUSR1, sighndlr) - try: - os.close(fd) - except OSError: - pass - try: - os.unlink(file) - except OSError: - pass - diff -r 265124610789 -r fea4c6760ca5 tests/__init__.py diff -r 265124610789 -r fea4c6760ca5 tests/dump_test_log.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/dump_test_log.py Sun Dec 18 18:03:45 2011 +0100 @@ -0,0 +1,42 @@ +import time, os, signal +LOG_FILE = 'test.log' + +log_file = open(LOG_FILE, 'a') + +def log(msg): + log_file.write(msg + '\n') + log_file.flush() + +def SigUSR1Handler(signum, frame): + print "Reacting on USR1 signal (signal 10)" + global log_file + log_file.close() + log_file = open(LOG_FILE, 'a') + return + +def init(): + if os.path.isfile('/var/usr/dump_test_log.pid'): + print 'Have to stop server first' + os.exit(1) + else: + print 'Starting server...' + #write process id file + f = open('/var/run/dump_test_log.pid', 'w') + f.write(str(os.getpid())) + f.flush() + f.close() + print 'Process start with pid ', os.getpid() + + signal.signal(signal.SIGUSR1, SigUSR1Handler) + +def main(): + init() + count = 1 + while True: + log('log line #%d, pid: %d' % (count, os.getpid())) + count = count + 1 + time.sleep(1) + +if __name__ == '__main__': + main() + diff -r 265124610789 -r fea4c6760ca5 tests/stopableServer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/stopableServer.py Sun Dec 18 18:03:45 2011 +0100 @@ -0,0 +1,118 @@ +import ConfigParser + +import threading + +from multiprocessing import Queue +from multiprocessing.managers import BaseManager + +from iro import xmlrpc,anbieter +from iro.user import User, Admin +from iro.iro import MySMTP,MySmstrade,MyUserDB +from iro.job import SMSJob, FAXJob, MailJob +from iro.joblist import Joblist +from iro.providerlist import Providerlist + +class StoppableXMLRPCServer(xmlrpc.SecureUserDBXMLRPCServer, threading.Thread): + running=False + def __init__(self, *args, **kwargs): + xmlrpc.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs) + threading.Thread.__init__(self) + self.timeout=.5 + + def start(self): + self.running=True + threading.Thread.start(self) + + + def run(self): + # *serve_forever* muss in einem eigenen Thread laufen, damit man es + # unterbrochen werden kann! + while (self.running): + try: + self.handle_request() + except : + break + + def stop(self): + if (self.running): + self.running=False + self.server_close() + self.join() + + def __del__(self): + self.stop() + + def __enter__(self): + self.start() + return self + + def __exit__(self,type,value,traceback): + self.stop() + + +class Internal: + pass + +def init_server(): + userlist=[{"name":"test","password":"test", "class":User}, + {"name":"test2","password":"test2", "class": User}, + {"name":"admin","password":"admin", "class": Admin}] + + + + class MyManager(BaseManager): + pass + + internal=Internal() + + MyManager.register('SMSJob', SMSJob) + MyManager.register('FaxJob', FAXJob) + MyManager.register('MailJob',MailJob) + MyManager.register('Providerlist',Providerlist) + manager = MyManager() + manager.start() + + internal.manager=manager + + #anbieter erzeugen und konfigurieren + sip=anbieter.sipgate() + sip.read_basic_config("iro.conf") + + localhost=MySMTP() + localhost.read_basic_config("iro.conf") + + smstrade=MySmstrade() + smstrade.read_basic_config("iro.conf") + + #Benutzerdatenbank erstellen + queue = Queue() + internal.queue=queue + provider=Providerlist() + internal.provider=provider + provider.add("sipgate", sip, ["sms", "fax", ]) + provider.add("smstrade", smstrade, ["sms", ]) + provider.add("geonet", None, ["sms", "fax", ]) + provider.add("fax.de", None, ["sms", "fax", ]) + provider.add("localhost", localhost, ["mail", ]) + provider.setDefault("sms","smstrade") + provider.setDefault("fax","sipgate") + provider.setDefault("mail","localhost") + jobqueue=Joblist(manager, queue, provider) + internal.jobqueue=jobqueue + userdb=MyUserDB(userlist,jobqueue) + internal.userdb=userdb + + + #Server starten + cp = ConfigParser.ConfigParser() + cp.read(["iro.conf"]) + cert=cp.get('server', 'cert') + key=cp.get('server', 'key') + serv = StoppableXMLRPCServer(addr=("localhost", 8000), + userdb=userdb, + certificate=cert,privatekey=key, + logRequests=False) + serv.relam="xmlrpc" + internal.serv=serv + return internal + diff -r 265124610789 -r fea4c6760ca5 tests/test.pdf Binary file tests/test.pdf has changed diff -r 265124610789 -r fea4c6760ca5 tests/testJob.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/testJob.py Sun Dec 18 18:03:45 2011 +0100 @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- + +import unittest + +import xmlrpclib +from stopableServer import init_server +from iro.anbieter.content import SMS,FAX,Mail + +class TestServer(unittest.TestCase): + + def setUp(self): + self.i = init_server() + self.serv=self.i.serv + self.serv.start() + + def tearDown(self): + self.serv.stop() + + + def SendSMS(self,msg): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + id=client.startSMS(msg,["01234", ] ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': unicode(msg)}} ) + ele=self.i.queue.get(.1) + self.assertEqual(ele.getRecipients(),["01234", ] ) + self.assertNotEqual(ele.getMessage(),SMS('') ) + self.assertEqual(ele.getMessage(),SMS(msg) ) + + def testSimpleSMS(self): + self.SendSMS("test") + + def testSpecialCharacters(self): + self.SendSMS(u"!\"§$%&/()=?\'") + self.SendSMS(u"@ł€ł€¶ŧł¼¼½¬¬↓ŧ←ĸ↓→øđŋħ“”µ·…–|") + + def testSendFAX(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ" + id=client.startFAX("test",xmlrpclib.Binary(msg),["01234", ] ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) + ele=self.i.queue.get(.1) + self.assertEqual(ele.getRecipients(),["01234", ] ) + self.assertEqual(ele.getMessage(),FAX('test','',[msg])) + + def testDoubleFAX(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ" + pdf=open('tests/test.pdf').read() + id=client.startFAX("test",[xmlrpclib.Binary(msg),xmlrpclib.Binary(pdf)],["01234", ] ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) + ele=self.i.queue.get(.1) + self.assertEqual(ele.getRecipients(),["01234", ] ) + self.assertEqual(ele.getMessage(),FAX('test','',[msg, pdf])) + + def testSendMail(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + msg=u"2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ" + id=client.startMail("test",msg,["test@test.de", ],'absender@test.de' ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) + ele=self.i.queue.get(.1) + self.assertEqual(ele.getRecipients(),["test@test.de", ] ) + self.assertEqual(ele.getMessage(),Mail('test',msg,'absender@test.de')) + self.assertEqual(ele.getMessage().as_string(),"""Content-Type: text/plain; charset="utf-8" +MIME-Version: 1.0 +Content-Transfer-Encoding: base64 +Subject: =?utf-8?q?test?= + +MjEzNHdlcmdzZGZnNHc1NnEzNDEzNMOmxb/DsMSRw6bDsMW/xJHFi8KzQMK8xafDpsOwxJHFi8WC +4oKswrbFp+KCrMK2xac= +""") + sub=u"³¼½ſðđŋſ€¼½ÖÄÜß" + id=client.startMail(sub,msg,["test@test.de", ],'absender@test.de' ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': sub}}) + ele=self.i.queue.get(.1) + self.assertEqual(ele.getMessage(),Mail(sub, msg, 'absender@test.de')) + +if __name__ == "__main__": + unittest.main() diff -r 265124610789 -r fea4c6760ca5 tests/testWorker.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/testWorker.py Sun Dec 18 18:03:45 2011 +0100 @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- + +import unittest +import logging +from time import sleep + +from multiprocessing import Queue +from multiprocessing.managers import BaseManager, ListProxy + +from iro.worker import Worker +from iro.job import Job, SMSJob +from iro.anbieter.anbieter import anbieter +from iro.anbieter.content import SMS +from iro.providerlist import Providerlist + +from logging.handlers import BufferingHandler + +class MyHandler(BufferingHandler): + def __init__(self,buffer=None): + '''BufferingHandler takes a "capacity" argument + so as to know when to flush. As we're overriding + shouldFlush anyway, we can set a capacity of zero. + You can call flush() manually to clear out the + buffer. + buffer: log messages to this buffer, needed f.ex for processes or threads''' + BufferingHandler.__init__(self, 0) + self.buffer=buffer + + def shouldFlush(self): + return False + + def emit(self, record): + if record.exc_info: #sonst geht das append schief, weil nicht picklebar + record.exc_info=record.exc_info[:-1] + self.buffer.append(record.__dict__) + + +class BadJob(Job): + def start(self,id=None): + Job.start(self,id) + raise Exception("Error") + +#einen Manager anlegen, der Job und eine Liste anbietet +class MyManager(BaseManager): + pass +MyManager.register('Job', Job) +MyManager.register('SMSJob', SMSJob) +MyManager.register('BadJob', BadJob) +MyManager.register('list', list, ListProxy) +MyManager.register('Providerlist',Providerlist) + +class TestWorker(unittest.TestCase): + def setUp(self): + #erstelle eine Queue&Manager + self.manager = MyManager() + self.queue = Queue() + + #manager starten, damit wir Logging anpassen können + self.manager.start() + self.setUpLogging() + + self.providerlist=self.manager.Providerlist() + self.providerlist.add("test", anbieter() , ["sms", ]) + + #eigentlich Workerprocess starten + self.worker= Worker(self.queue) + self.worker.start() + + def tearDown(self): + #Thread&Queue stoppen + self.stop() + + #Logging änderungen rückgängig + self.tearDownLogging() + self.manager.shutdown() + + def stop(self): + self.queue.close() + self.queue.join_thread() + self.worker.terminate() + + + def setUpLogging(self): + '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' + #wir brauchen eine threadsichere liste + self.buffer=self.manager.list() + + #Handler erstellen, der in den Buffer schreibt + self.handler = h = MyHandler(self.buffer) + self.logger =l= logging.getLogger() + + #Level anpassen + l.setLevel(logging.DEBUG) + h.setLevel(logging.DEBUG) + l.addHandler(h) + + def tearDownLogging(self): + '''crazy logging hacks wieder entfernen''' + self.logger.removeHandler(self.handler) + self.logger.setLevel(logging.NOTSET) + self.handler.close() + + + def testJob(self): + '''einen Job verarbeiten''' + job=self.manager.Job(None,None,"test") + self.assertEqual(job.getStatus(),("init",{})) + self.queue.put(job) + sleep(.1) + self.stop() + self.assertEqual(job.getStatus(),("started",{})) + self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], + [(20,'Workerprocess läuft nun...'), + (20,'ein neuer Job(1)'), + (20,'Job(1) fertig ;)')]) + + def testBadJob(self): + '''einen Job verarbeiten, der fehlschlägt''' + job=self.manager.BadJob(None,None,"test") + self.assertEqual(job.getStatus(),("init",{})) + self.queue.put(job) + sleep(.1) + self.stop() + self.assertEqual(job.getStatus(),("error",{})) + self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], + [(20,'Workerprocess läuft nun...'), + (20,'ein neuer Job(1)'), + (40,'Job(1) fehlgeschlagen :(')]) + error=Exception('Error') + self.assertEqual(self.buffer[-1]['exc_info'][0],type(error)) + self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error)) + + def testSMSJob(self): + job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345]) + self.assertEqual(job.getStatus(),("init",{})) + self.queue.put(job) + sleep(.1) + self.stop() + self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []})) + self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], + [(20,'Workerprocess läuft nun...'), + (20,'ein neuer Job(1)'), + (20,'Job(1) fertig ;)')]) + + +if __name__ == "__main__": + unittest.main() diff -r 265124610789 -r fea4c6760ca5 tests/testXMLRPCServer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/testXMLRPCServer.py Sun Dec 18 18:03:45 2011 +0100 @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- + +import unittest + +import xmlrpclib +from stopableServer import init_server + +class TestServer(unittest.TestCase): + + def setUp(self): + self.i = init_server() + self.serv=self.i.serv + + self.serv.start() + + def tearDown(self): + self.serv.stop() + + def testLogin(self): + self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {}) + self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {}) + self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status) + self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status) + + def testsendSMS(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + id=client.startSMS("test",["01234", ] ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) + + def testTwoUser(self): + u1="https://test:test@localhost:8000" + u2="https://test2:test2@localhost:8000" + admin="https://admin:admin@localhost:8000" + client1=xmlrpclib.Server(u1) + client2=xmlrpclib.Server(u2) + admin=xmlrpclib.Server(admin) + id1=client1.startSMS("test",["01234"] ) + self.assertEqual(client2.status(),{} ) + self.assertEqual(admin.status(id1),{id1: {'status': ['init', {}], 'name': 'test'}} ) + id2=client2.startSMS("test2",["01234"] ) + self.assertNotEqual(id1, id2) + self.assertEqual(client1.status(),{id1: {'status': ['init', {}], 'name': 'test'}}) + self.assertEqual(client2.status(),{id2: {'status': ['init', {}], 'name': 'test2'}}) + self.assertEqual(admin.status(),{id1: {'status': ['init', {}], 'name': 'test'}, + id2: {'status': ['init', {}], 'name': 'test2'}} ) + + self.assertEqual(client2.status(id1), {}) + self.assertEqual(client1.status(id2), {}) + + def testGetProvider(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"]) + self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"]) + self.assertEqual(client.getProvider("mail"), ["localhost"]) + + self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp") + + def testGetDefault(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + self.assertEqual(client.getDefaultProvider("sms"), "smstrade") + self.assertEqual(client.getDefaultProvider("fax"),"sipgate") + self.assertEqual(client.getDefaultProvider("mail"), "localhost") + + self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp") + + +if __name__ == "__main__": + unittest.main() diff -r 265124610789 -r fea4c6760ca5 tests/testloglock.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/testloglock.py Sun Dec 18 18:03:45 2011 +0100 @@ -0,0 +1,98 @@ + +import unittest +import os +import tempfile +import signal +import threading +import time + +class testLogLock(unittest.TestCase): + def test_ThreadingAndLocks(self): + #create a thread, have that thread grab a lock, and sleep. + fd, file = tempfile.mkstemp('.nonlog') + _lock = threading.RLock() + os.close(fd) + def locker(): + os.write(fd, 'Thread acquiring lock\n') + _lock.acquire() + os.write(fd, 'Thread acquired lock\n') + time.sleep(0.4) + os.write(fd, 'Thread releasing lock\n') + _lock.release() + os.write(fd, 'Thread released lock\n') + + #Then in the main thread, throw a signal to self + def handleSignal(sigNum, frame): + os.write(fd, 'Main Thread acquiring lock\n') + lock = False + endtime = time.time() + 1.0 + while not lock: + lock = _lock.acquire(blocking=0) + time.sleep(0.01) + if time.time() > endtime: + break + if not lock: + os.write(fd, 'Main Thread could not acquire lock\n') + return + os.write(fd, 'Main Thread acquired lock\n') + os.write(fd, 'Main Thread releasing lock\n') + _lock.release() + os.write(fd, 'Main Thread released lock\n') + + sighndlr = signal.signal(signal.SIGUSR1, handleSignal) + try: + fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT) + thread = threading.Thread(target=locker) + thread.start() + time.sleep(0.1) + os.kill(os.getpid(), signal.SIGUSR1) + thread.join() + + #check the results + os.close(fd) + fileconts = open(file, 'r').read() + self.assertTrue('Main Thread released lock' in fileconts) + self.assertEqual(fileconts, + '''Thread acquiring lock +Thread acquired lock +Main Thread acquiring lock +Thread releasing lock +Thread released lock +Main Thread acquired lock +Main Thread releasing lock +Main Thread released lock +''') + + #Now try after acquiring the lock from the main thread + fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT) + _lock.acquire() + thread = threading.Thread(target=locker) + thread.start() + time.sleep(0.1) + os.kill(os.getpid(), signal.SIGUSR1) + _lock.release() + thread.join() + os.close(fd) + fileconts = open(file, 'r').read() + self.assertEqual(fileconts, + '''Thread acquiring lock +Main Thread acquiring lock +Main Thread acquired lock +Main Thread releasing lock +Main Thread released lock +Thread acquired lock +Thread releasing lock +Thread released lock +''') + + finally: + signal.signal(signal.SIGUSR1, sighndlr) + try: + os.close(fd) + except OSError: + pass + try: + os.unlink(file) + except OSError: + pass +