--- a/iro/controller/database.py Tue Jan 10 06:07:25 2012 +0100
+++ b/iro/controller/database.py Tue Jan 10 06:11:46 2012 +0100
@@ -1,5 +1,10 @@
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, pool
+
from sqlalchemy.orm import sessionmaker
+
+
+from twisted.internet import threads
+
from ..model import Base
engine = create_engine('sqlite:///:memory:', echo=True)
@@ -9,10 +14,16 @@
Session = sessionmaker(bind=engine)
+def toThread(f):
+ def wrapper(*args, **kwargs):
+ return threads.deferToThread(f, *args, **kwargs)
+ return wrapper
+
+
class WithSession():
def __init__(self,autocommit=False):
self.autocommit=autocommit
-
+
def __enter__(self):
self.conn = engine.connect()
self.session = Session(bind=self.conn)
@@ -27,3 +38,20 @@
self.session.close()
self.conn.close()
+class DBDefer(object):
+ def __init__(self, dsn, poolclass = pool.SingletonThreadPool, *args, **kargs):
+ self.engine = create_engine(dsn, poolclass=poolclass, *args, **kargs)
+
+ def __call__(self, func):
+ @toThread
+ def wrapper(*args, **kwargs):
+ session = sessionmaker(bind=self.engine)()
+ try:
+ print func,args, kwargs
+ return func(*args, session=session, **kwargs)
+ except:
+ session.rollback()
+ raise
+ finally:
+ session.close()
+ return wrapper
--- a/iro/controller/user.py Tue Jan 10 06:07:25 2012 +0100
+++ b/iro/controller/user.py Tue Jan 10 06:11:46 2012 +0100
@@ -1,7 +1,5 @@
-import logging
import re
-from .database import WithSession
from ..model import User
from ..error import UserNotFound, InterfaceException, ValidateException
@@ -24,30 +22,31 @@
@validate(userhash=rehash)
-def getuser(userhash):
- with WithSession() as session:
- user = session.query(User).filter_by(apikey=userhash).first()
- if user is None:
- raise UserNotFound()
- else:
- return user
+def getuser(userhash, session):
+ user = session.query(User).filter_by(apikey=userhash).first()
+ if user is None:
+ raise UserNotFound()
+ else:
+ return user
def with_user(f):
def new_f(*args,**kargs):
args=list(args)
- logging.debug("Entering %s"%f.__name__)
try:
- kargs["user"]=getuser(userhash = kargs["apikey"])
+ userhash = kargs["apikey"]
del kargs["apikey"]
except KeyError:
try:
- kargs["user"]=getuser(userhash = args[1])
+ userhash = args[1]
+ del args[1]
except IndexError:
raise InterfaceException()
- del args[1]
- ret=f(*args,**kargs)
- logging.debug("Exited %s"%f.__name__)
- return ret
+
+ def _gotResult(_user):
+ kargs["user"]=_user
+ return f(*args,**kargs)
+ return getuser(userhash=userhash).addCallback(_gotResult)
+
new_f.__name__ = f.__name__
return new_f
--- a/tests/dump_test_log.py Tue Jan 10 06:07:25 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,42 +0,0 @@
-import time, os, signal
-LOG_FILE = 'test.log'
-
-log_file = open(LOG_FILE, 'a')
-
-def log(msg):
- log_file.write(msg + '\n')
- log_file.flush()
-
-def SigUSR1Handler(signum, frame):
- print "Reacting on USR1 signal (signal 10)"
- global log_file
- log_file.close()
- log_file = open(LOG_FILE, 'a')
- return
-
-def init():
- if os.path.isfile('/var/usr/dump_test_log.pid'):
- print 'Have to stop server first'
- os.exit(1)
- else:
- print 'Starting server...'
- #write process id file
- f = open('/var/run/dump_test_log.pid', 'w')
- f.write(str(os.getpid()))
- f.flush()
- f.close()
- print 'Process start with pid ', os.getpid()
-
- signal.signal(signal.SIGUSR1, SigUSR1Handler)
-
-def main():
- init()
- count = 1
- while True:
- log('log line #%d, pid: %d' % (count, os.getpid()))
- count = count + 1
- time.sleep(1)
-
-if __name__ == '__main__':
- main()
-
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/dump_test_log.py Tue Jan 10 06:11:46 2012 +0100
@@ -0,0 +1,42 @@
+import time, os, signal
+LOG_FILE = 'test.log'
+
+log_file = open(LOG_FILE, 'a')
+
+def log(msg):
+ log_file.write(msg + '\n')
+ log_file.flush()
+
+def SigUSR1Handler(signum, frame):
+ print "Reacting on USR1 signal (signal 10)"
+ global log_file
+ log_file.close()
+ log_file = open(LOG_FILE, 'a')
+ return
+
+def init():
+ if os.path.isfile('/var/usr/dump_test_log.pid'):
+ print 'Have to stop server first'
+ os.exit(1)
+ else:
+ print 'Starting server...'
+ #write process id file
+ f = open('/var/run/dump_test_log.pid', 'w')
+ f.write(str(os.getpid()))
+ f.flush()
+ f.close()
+ print 'Process start with pid ', os.getpid()
+
+ signal.signal(signal.SIGUSR1, SigUSR1Handler)
+
+def main():
+ init()
+ count = 1
+ while True:
+ log('log line #%d, pid: %d' % (count, os.getpid()))
+ count = count + 1
+ time.sleep(1)
+
+if __name__ == '__main__':
+ main()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/stopableServer.py Tue Jan 10 06:11:46 2012 +0100
@@ -0,0 +1,118 @@
+import ConfigParser
+
+import threading
+
+from multiprocessing import Queue
+from multiprocessing.managers import BaseManager
+
+from iro import xmlrpc,anbieter
+from iro.user import User, Admin
+from iro.iro import MySMTP,MySmstrade,MyUserDB
+from iro.job import SMSJob, FAXJob, MailJob
+from iro.joblist import Joblist
+from iro.providerlist import Providerlist
+
+class StoppableXMLRPCServer(xmlrpc.SecureUserDBXMLRPCServer, threading.Thread):
+ running=False
+ def __init__(self, *args, **kwargs):
+ xmlrpc.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
+ threading.Thread.__init__(self)
+ self.timeout=.5
+
+ def start(self):
+ self.running=True
+ threading.Thread.start(self)
+
+
+ def run(self):
+ # *serve_forever* muss in einem eigenen Thread laufen, damit man es
+ # unterbrochen werden kann!
+ while (self.running):
+ try:
+ self.handle_request()
+ except :
+ break
+
+ def stop(self):
+ if (self.running):
+ self.running=False
+ self.server_close()
+ self.join()
+
+ def __del__(self):
+ self.stop()
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self,type,value,traceback):
+ self.stop()
+
+
+class Internal:
+ pass
+
+def init_server():
+ userlist=[{"name":"test","password":"test", "class":User},
+ {"name":"test2","password":"test2", "class": User},
+ {"name":"admin","password":"admin", "class": Admin}]
+
+
+
+ class MyManager(BaseManager):
+ pass
+
+ internal=Internal()
+
+ MyManager.register('SMSJob', SMSJob)
+ MyManager.register('FaxJob', FAXJob)
+ MyManager.register('MailJob',MailJob)
+ MyManager.register('Providerlist',Providerlist)
+ manager = MyManager()
+ manager.start()
+
+ internal.manager=manager
+
+ #anbieter erzeugen und konfigurieren
+ sip=anbieter.sipgate()
+ sip.read_basic_config("iro.conf")
+
+ localhost=MySMTP()
+ localhost.read_basic_config("iro.conf")
+
+ smstrade=MySmstrade()
+ smstrade.read_basic_config("iro.conf")
+
+ #Benutzerdatenbank erstellen
+ queue = Queue()
+ internal.queue=queue
+ provider=Providerlist()
+ internal.provider=provider
+ provider.add("sipgate", sip, ["sms", "fax", ])
+ provider.add("smstrade", smstrade, ["sms", ])
+ provider.add("geonet", None, ["sms", "fax", ])
+ provider.add("fax.de", None, ["sms", "fax", ])
+ provider.add("localhost", localhost, ["mail", ])
+ provider.setDefault("sms","smstrade")
+ provider.setDefault("fax","sipgate")
+ provider.setDefault("mail","localhost")
+ jobqueue=Joblist(manager, queue, provider)
+ internal.jobqueue=jobqueue
+ userdb=MyUserDB(userlist,jobqueue)
+ internal.userdb=userdb
+
+
+ #Server starten
+ cp = ConfigParser.ConfigParser()
+ cp.read(["iro.conf"])
+ cert=cp.get('server', 'cert')
+ key=cp.get('server', 'key')
+ serv = StoppableXMLRPCServer(addr=("localhost", 8000),
+ userdb=userdb,
+ certificate=cert,privatekey=key,
+ logRequests=False)
+ serv.relam="xmlrpc"
+ internal.serv=serv
+ return internal
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/testJob.py Tue Jan 10 06:11:46 2012 +0100
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+
+import xmlrpclib
+from stopableServer import init_server
+from iro.anbieter.content import SMS,FAX,Mail
+
+class TestServer(unittest.TestCase):
+
+ def setUp(self):
+ self.i = init_server()
+ self.serv=self.i.serv
+ self.serv.start()
+
+ def tearDown(self):
+ self.serv.stop()
+
+
+ def SendSMS(self,msg):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ id=client.startSMS(msg,["01234", ] )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': unicode(msg)}} )
+ ele=self.i.queue.get(.1)
+ self.assertEqual(ele.getRecipients(),["01234", ] )
+ self.assertNotEqual(ele.getMessage(),SMS('') )
+ self.assertEqual(ele.getMessage(),SMS(msg) )
+
+ def testSimpleSMS(self):
+ self.SendSMS("test")
+
+ def testSpecialCharacters(self):
+ self.SendSMS(u"!\"§$%&/()=?\'")
+ self.SendSMS(u"@ł€ł€¶ŧł¼¼½¬¬↓ŧ←ĸ↓→øđŋħ“”µ·…–|")
+
+ def testSendFAX(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
+ id=client.startFAX("test",xmlrpclib.Binary(msg),["01234", ] )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
+ ele=self.i.queue.get(.1)
+ self.assertEqual(ele.getRecipients(),["01234", ] )
+ self.assertEqual(ele.getMessage(),FAX('test','',[msg]))
+
+ def testDoubleFAX(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
+ pdf=open('tests/test.pdf').read()
+ id=client.startFAX("test",[xmlrpclib.Binary(msg),xmlrpclib.Binary(pdf)],["01234", ] )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
+ ele=self.i.queue.get(.1)
+ self.assertEqual(ele.getRecipients(),["01234", ] )
+ self.assertEqual(ele.getMessage(),FAX('test','',[msg, pdf]))
+
+ def testSendMail(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ msg=u"2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
+ id=client.startMail("test",msg,["test@test.de", ],'absender@test.de' )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
+ ele=self.i.queue.get(.1)
+ self.assertEqual(ele.getRecipients(),["test@test.de", ] )
+ self.assertEqual(ele.getMessage(),Mail('test',msg,'absender@test.de'))
+ self.assertEqual(ele.getMessage().as_string(),"""Content-Type: text/plain; charset="utf-8"
+MIME-Version: 1.0
+Content-Transfer-Encoding: base64
+Subject: =?utf-8?q?test?=
+
+MjEzNHdlcmdzZGZnNHc1NnEzNDEzNMOmxb/DsMSRw6bDsMW/xJHFi8KzQMK8xafDpsOwxJHFi8WC
+4oKswrbFp+KCrMK2xac=
+""")
+ sub=u"³¼½ſðđŋſ€¼½ÖÄÜß"
+ id=client.startMail(sub,msg,["test@test.de", ],'absender@test.de' )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': sub}})
+ ele=self.i.queue.get(.1)
+ self.assertEqual(ele.getMessage(),Mail(sub, msg, 'absender@test.de'))
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/testWorker.py Tue Jan 10 06:11:46 2012 +0100
@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+import logging
+from time import sleep
+
+from multiprocessing import Queue
+from multiprocessing.managers import BaseManager, ListProxy
+
+from iro.worker import Worker
+from iro.job import Job, SMSJob
+from iro.anbieter.anbieter import anbieter
+from iro.anbieter.content import SMS
+from iro.providerlist import Providerlist
+
+from logging.handlers import BufferingHandler
+
+class MyHandler(BufferingHandler):
+ def __init__(self,buffer=None):
+ '''BufferingHandler takes a "capacity" argument
+ so as to know when to flush. As we're overriding
+ shouldFlush anyway, we can set a capacity of zero.
+ You can call flush() manually to clear out the
+ buffer.
+ buffer: log messages to this buffer, needed f.ex for processes or threads'''
+ BufferingHandler.__init__(self, 0)
+ self.buffer=buffer
+
+ def shouldFlush(self):
+ return False
+
+ def emit(self, record):
+ if record.exc_info: #sonst geht das append schief, weil nicht picklebar
+ record.exc_info=record.exc_info[:-1]
+ self.buffer.append(record.__dict__)
+
+
+class BadJob(Job):
+ def start(self,id=None):
+ Job.start(self,id)
+ raise Exception("Error")
+
+#einen Manager anlegen, der Job und eine Liste anbietet
+class MyManager(BaseManager):
+ pass
+MyManager.register('Job', Job)
+MyManager.register('SMSJob', SMSJob)
+MyManager.register('BadJob', BadJob)
+MyManager.register('list', list, ListProxy)
+MyManager.register('Providerlist',Providerlist)
+
+class TestWorker(unittest.TestCase):
+ def setUp(self):
+ #erstelle eine Queue&Manager
+ self.manager = MyManager()
+ self.queue = Queue()
+
+ #manager starten, damit wir Logging anpassen können
+ self.manager.start()
+ self.setUpLogging()
+
+ self.providerlist=self.manager.Providerlist()
+ self.providerlist.add("test", anbieter() , ["sms", ])
+
+ #eigentlich Workerprocess starten
+ self.worker= Worker(self.queue)
+ self.worker.start()
+
+ def tearDown(self):
+ #Thread&Queue stoppen
+ self.stop()
+
+ #Logging änderungen rückgängig
+ self.tearDownLogging()
+ self.manager.shutdown()
+
+ def stop(self):
+ self.queue.close()
+ self.queue.join_thread()
+ self.worker.terminate()
+
+
+ def setUpLogging(self):
+ '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
+ #wir brauchen eine threadsichere liste
+ self.buffer=self.manager.list()
+
+ #Handler erstellen, der in den Buffer schreibt
+ self.handler = h = MyHandler(self.buffer)
+ self.logger =l= logging.getLogger()
+
+ #Level anpassen
+ l.setLevel(logging.DEBUG)
+ h.setLevel(logging.DEBUG)
+ l.addHandler(h)
+
+ def tearDownLogging(self):
+ '''crazy logging hacks wieder entfernen'''
+ self.logger.removeHandler(self.handler)
+ self.logger.setLevel(logging.NOTSET)
+ self.handler.close()
+
+
+ def testJob(self):
+ '''einen Job verarbeiten'''
+ job=self.manager.Job(None,None,"test")
+ self.assertEqual(job.getStatus(),("init",{}))
+ self.queue.put(job)
+ sleep(.1)
+ self.stop()
+ self.assertEqual(job.getStatus(),("started",{}))
+ self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
+ [(20,'Workerprocess läuft nun...'),
+ (20,'ein neuer Job(1)'),
+ (20,'Job(1) fertig ;)')])
+
+ def testBadJob(self):
+ '''einen Job verarbeiten, der fehlschlägt'''
+ job=self.manager.BadJob(None,None,"test")
+ self.assertEqual(job.getStatus(),("init",{}))
+ self.queue.put(job)
+ sleep(.1)
+ self.stop()
+ self.assertEqual(job.getStatus(),("error",{}))
+ self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
+ [(20,'Workerprocess läuft nun...'),
+ (20,'ein neuer Job(1)'),
+ (40,'Job(1) fehlgeschlagen :(')])
+ error=Exception('Error')
+ self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
+ self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
+
+ def testSMSJob(self):
+ job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
+ self.assertEqual(job.getStatus(),("init",{}))
+ self.queue.put(job)
+ sleep(.1)
+ self.stop()
+ self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
+ self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
+ [(20,'Workerprocess läuft nun...'),
+ (20,'ein neuer Job(1)'),
+ (20,'Job(1) fertig ;)')])
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/testXMLRPCServer.py Tue Jan 10 06:11:46 2012 +0100
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+
+import xmlrpclib
+from stopableServer import init_server
+
+class TestServer(unittest.TestCase):
+
+ def setUp(self):
+ self.i = init_server()
+ self.serv=self.i.serv
+
+ self.serv.start()
+
+ def tearDown(self):
+ self.serv.stop()
+
+ def testLogin(self):
+ self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
+ self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
+ self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
+ self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
+
+ def testsendSMS(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ id=client.startSMS("test",["01234", ] )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
+
+ def testTwoUser(self):
+ u1="https://test:test@localhost:8000"
+ u2="https://test2:test2@localhost:8000"
+ admin="https://admin:admin@localhost:8000"
+ client1=xmlrpclib.Server(u1)
+ client2=xmlrpclib.Server(u2)
+ admin=xmlrpclib.Server(admin)
+ id1=client1.startSMS("test",["01234"] )
+ self.assertEqual(client2.status(),{} )
+ self.assertEqual(admin.status(id1),{id1: {'status': ['init', {}], 'name': 'test'}} )
+ id2=client2.startSMS("test2",["01234"] )
+ self.assertNotEqual(id1, id2)
+ self.assertEqual(client1.status(),{id1: {'status': ['init', {}], 'name': 'test'}})
+ self.assertEqual(client2.status(),{id2: {'status': ['init', {}], 'name': 'test2'}})
+ self.assertEqual(admin.status(),{id1: {'status': ['init', {}], 'name': 'test'},
+ id2: {'status': ['init', {}], 'name': 'test2'}} )
+
+ self.assertEqual(client2.status(id1), {})
+ self.assertEqual(client1.status(id2), {})
+
+ def testGetProvider(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"])
+ self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"])
+ self.assertEqual(client.getProvider("mail"), ["localhost"])
+
+ self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp")
+
+ def testGetDefault(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ self.assertEqual(client.getDefaultProvider("sms"), "smstrade")
+ self.assertEqual(client.getDefaultProvider("fax"),"sipgate")
+ self.assertEqual(client.getDefaultProvider("mail"), "localhost")
+
+ self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp")
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/testloglock.py Tue Jan 10 06:11:46 2012 +0100
@@ -0,0 +1,98 @@
+
+import unittest
+import os
+import tempfile
+import signal
+import threading
+import time
+
+class testLogLock(unittest.TestCase):
+ def test_ThreadingAndLocks(self):
+ #create a thread, have that thread grab a lock, and sleep.
+ fd, file = tempfile.mkstemp('.nonlog')
+ _lock = threading.RLock()
+ os.close(fd)
+ def locker():
+ os.write(fd, 'Thread acquiring lock\n')
+ _lock.acquire()
+ os.write(fd, 'Thread acquired lock\n')
+ time.sleep(0.4)
+ os.write(fd, 'Thread releasing lock\n')
+ _lock.release()
+ os.write(fd, 'Thread released lock\n')
+
+ #Then in the main thread, throw a signal to self
+ def handleSignal(sigNum, frame):
+ os.write(fd, 'Main Thread acquiring lock\n')
+ lock = False
+ endtime = time.time() + 1.0
+ while not lock:
+ lock = _lock.acquire(blocking=0)
+ time.sleep(0.01)
+ if time.time() > endtime:
+ break
+ if not lock:
+ os.write(fd, 'Main Thread could not acquire lock\n')
+ return
+ os.write(fd, 'Main Thread acquired lock\n')
+ os.write(fd, 'Main Thread releasing lock\n')
+ _lock.release()
+ os.write(fd, 'Main Thread released lock\n')
+
+ sighndlr = signal.signal(signal.SIGUSR1, handleSignal)
+ try:
+ fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
+ thread = threading.Thread(target=locker)
+ thread.start()
+ time.sleep(0.1)
+ os.kill(os.getpid(), signal.SIGUSR1)
+ thread.join()
+
+ #check the results
+ os.close(fd)
+ fileconts = open(file, 'r').read()
+ self.assertTrue('Main Thread released lock' in fileconts)
+ self.assertEqual(fileconts,
+ '''Thread acquiring lock
+Thread acquired lock
+Main Thread acquiring lock
+Thread releasing lock
+Thread released lock
+Main Thread acquired lock
+Main Thread releasing lock
+Main Thread released lock
+''')
+
+ #Now try after acquiring the lock from the main thread
+ fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
+ _lock.acquire()
+ thread = threading.Thread(target=locker)
+ thread.start()
+ time.sleep(0.1)
+ os.kill(os.getpid(), signal.SIGUSR1)
+ _lock.release()
+ thread.join()
+ os.close(fd)
+ fileconts = open(file, 'r').read()
+ self.assertEqual(fileconts,
+ '''Thread acquiring lock
+Main Thread acquiring lock
+Main Thread acquired lock
+Main Thread releasing lock
+Main Thread released lock
+Thread acquired lock
+Thread releasing lock
+Thread released lock
+''')
+
+ finally:
+ signal.signal(signal.SIGUSR1, sighndlr)
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+ try:
+ os.unlink(file)
+ except OSError:
+ pass
+
--- a/tests/stopableServer.py Tue Jan 10 06:07:25 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,118 +0,0 @@
-import ConfigParser
-
-import threading
-
-from multiprocessing import Queue
-from multiprocessing.managers import BaseManager
-
-from iro import xmlrpc,anbieter
-from iro.user import User, Admin
-from iro.iro import MySMTP,MySmstrade,MyUserDB
-from iro.job import SMSJob, FAXJob, MailJob
-from iro.joblist import Joblist
-from iro.providerlist import Providerlist
-
-class StoppableXMLRPCServer(xmlrpc.SecureUserDBXMLRPCServer, threading.Thread):
- running=False
- def __init__(self, *args, **kwargs):
- xmlrpc.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
- threading.Thread.__init__(self)
- self.timeout=.5
-
- def start(self):
- self.running=True
- threading.Thread.start(self)
-
-
- def run(self):
- # *serve_forever* muss in einem eigenen Thread laufen, damit man es
- # unterbrochen werden kann!
- while (self.running):
- try:
- self.handle_request()
- except :
- break
-
- def stop(self):
- if (self.running):
- self.running=False
- self.server_close()
- self.join()
-
- def __del__(self):
- self.stop()
-
- def __enter__(self):
- self.start()
- return self
-
- def __exit__(self,type,value,traceback):
- self.stop()
-
-
-class Internal:
- pass
-
-def init_server():
- userlist=[{"name":"test","password":"test", "class":User},
- {"name":"test2","password":"test2", "class": User},
- {"name":"admin","password":"admin", "class": Admin}]
-
-
-
- class MyManager(BaseManager):
- pass
-
- internal=Internal()
-
- MyManager.register('SMSJob', SMSJob)
- MyManager.register('FaxJob', FAXJob)
- MyManager.register('MailJob',MailJob)
- MyManager.register('Providerlist',Providerlist)
- manager = MyManager()
- manager.start()
-
- internal.manager=manager
-
- #anbieter erzeugen und konfigurieren
- sip=anbieter.sipgate()
- sip.read_basic_config("iro.conf")
-
- localhost=MySMTP()
- localhost.read_basic_config("iro.conf")
-
- smstrade=MySmstrade()
- smstrade.read_basic_config("iro.conf")
-
- #Benutzerdatenbank erstellen
- queue = Queue()
- internal.queue=queue
- provider=Providerlist()
- internal.provider=provider
- provider.add("sipgate", sip, ["sms", "fax", ])
- provider.add("smstrade", smstrade, ["sms", ])
- provider.add("geonet", None, ["sms", "fax", ])
- provider.add("fax.de", None, ["sms", "fax", ])
- provider.add("localhost", localhost, ["mail", ])
- provider.setDefault("sms","smstrade")
- provider.setDefault("fax","sipgate")
- provider.setDefault("mail","localhost")
- jobqueue=Joblist(manager, queue, provider)
- internal.jobqueue=jobqueue
- userdb=MyUserDB(userlist,jobqueue)
- internal.userdb=userdb
-
-
- #Server starten
- cp = ConfigParser.ConfigParser()
- cp.read(["iro.conf"])
- cert=cp.get('server', 'cert')
- key=cp.get('server', 'key')
- serv = StoppableXMLRPCServer(addr=("localhost", 8000),
- userdb=userdb,
- certificate=cert,privatekey=key,
- logRequests=False)
- serv.relam="xmlrpc"
- internal.serv=serv
- return internal
-
--- a/tests/testJob.py Tue Jan 10 06:07:25 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,82 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-
-import xmlrpclib
-from stopableServer import init_server
-from iro.anbieter.content import SMS,FAX,Mail
-
-class TestServer(unittest.TestCase):
-
- def setUp(self):
- self.i = init_server()
- self.serv=self.i.serv
- self.serv.start()
-
- def tearDown(self):
- self.serv.stop()
-
-
- def SendSMS(self,msg):
- servstr="https://test:test@localhost:8000"
- client=xmlrpclib.Server(servstr)
- id=client.startSMS(msg,["01234", ] )
- self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': unicode(msg)}} )
- ele=self.i.queue.get(.1)
- self.assertEqual(ele.getRecipients(),["01234", ] )
- self.assertNotEqual(ele.getMessage(),SMS('') )
- self.assertEqual(ele.getMessage(),SMS(msg) )
-
- def testSimpleSMS(self):
- self.SendSMS("test")
-
- def testSpecialCharacters(self):
- self.SendSMS(u"!\"§$%&/()=?\'")
- self.SendSMS(u"@ł€ł€¶ŧł¼¼½¬¬↓ŧ←ĸ↓→øđŋħ“”µ·…–|")
-
- def testSendFAX(self):
- servstr="https://test:test@localhost:8000"
- client=xmlrpclib.Server(servstr)
- msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
- id=client.startFAX("test",xmlrpclib.Binary(msg),["01234", ] )
- self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
- ele=self.i.queue.get(.1)
- self.assertEqual(ele.getRecipients(),["01234", ] )
- self.assertEqual(ele.getMessage(),FAX('test','',[msg]))
-
- def testDoubleFAX(self):
- servstr="https://test:test@localhost:8000"
- client=xmlrpclib.Server(servstr)
- msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
- pdf=open('tests/test.pdf').read()
- id=client.startFAX("test",[xmlrpclib.Binary(msg),xmlrpclib.Binary(pdf)],["01234", ] )
- self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
- ele=self.i.queue.get(.1)
- self.assertEqual(ele.getRecipients(),["01234", ] )
- self.assertEqual(ele.getMessage(),FAX('test','',[msg, pdf]))
-
- def testSendMail(self):
- servstr="https://test:test@localhost:8000"
- client=xmlrpclib.Server(servstr)
- msg=u"2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
- id=client.startMail("test",msg,["test@test.de", ],'absender@test.de' )
- self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
- ele=self.i.queue.get(.1)
- self.assertEqual(ele.getRecipients(),["test@test.de", ] )
- self.assertEqual(ele.getMessage(),Mail('test',msg,'absender@test.de'))
- self.assertEqual(ele.getMessage().as_string(),"""Content-Type: text/plain; charset="utf-8"
-MIME-Version: 1.0
-Content-Transfer-Encoding: base64
-Subject: =?utf-8?q?test?=
-
-MjEzNHdlcmdzZGZnNHc1NnEzNDEzNMOmxb/DsMSRw6bDsMW/xJHFi8KzQMK8xafDpsOwxJHFi8WC
-4oKswrbFp+KCrMK2xac=
-""")
- sub=u"³¼½ſðđŋſ€¼½ÖÄÜß"
- id=client.startMail(sub,msg,["test@test.de", ],'absender@test.de' )
- self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': sub}})
- ele=self.i.queue.get(.1)
- self.assertEqual(ele.getMessage(),Mail(sub, msg, 'absender@test.de'))
-
-if __name__ == "__main__":
- unittest.main()
--- a/tests/testWorker.py Tue Jan 10 06:07:25 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,147 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-import logging
-from time import sleep
-
-from multiprocessing import Queue
-from multiprocessing.managers import BaseManager, ListProxy
-
-from iro.worker import Worker
-from iro.job import Job, SMSJob
-from iro.anbieter.anbieter import anbieter
-from iro.anbieter.content import SMS
-from iro.providerlist import Providerlist
-
-from logging.handlers import BufferingHandler
-
-class MyHandler(BufferingHandler):
- def __init__(self,buffer=None):
- '''BufferingHandler takes a "capacity" argument
- so as to know when to flush. As we're overriding
- shouldFlush anyway, we can set a capacity of zero.
- You can call flush() manually to clear out the
- buffer.
- buffer: log messages to this buffer, needed f.ex for processes or threads'''
- BufferingHandler.__init__(self, 0)
- self.buffer=buffer
-
- def shouldFlush(self):
- return False
-
- def emit(self, record):
- if record.exc_info: #sonst geht das append schief, weil nicht picklebar
- record.exc_info=record.exc_info[:-1]
- self.buffer.append(record.__dict__)
-
-
-class BadJob(Job):
- def start(self,id=None):
- Job.start(self,id)
- raise Exception("Error")
-
-#einen Manager anlegen, der Job und eine Liste anbietet
-class MyManager(BaseManager):
- pass
-MyManager.register('Job', Job)
-MyManager.register('SMSJob', SMSJob)
-MyManager.register('BadJob', BadJob)
-MyManager.register('list', list, ListProxy)
-MyManager.register('Providerlist',Providerlist)
-
-class TestWorker(unittest.TestCase):
- def setUp(self):
- #erstelle eine Queue&Manager
- self.manager = MyManager()
- self.queue = Queue()
-
- #manager starten, damit wir Logging anpassen können
- self.manager.start()
- self.setUpLogging()
-
- self.providerlist=self.manager.Providerlist()
- self.providerlist.add("test", anbieter() , ["sms", ])
-
- #eigentlich Workerprocess starten
- self.worker= Worker(self.queue)
- self.worker.start()
-
- def tearDown(self):
- #Thread&Queue stoppen
- self.stop()
-
- #Logging änderungen rückgängig
- self.tearDownLogging()
- self.manager.shutdown()
-
- def stop(self):
- self.queue.close()
- self.queue.join_thread()
- self.worker.terminate()
-
-
- def setUpLogging(self):
- '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
- #wir brauchen eine threadsichere liste
- self.buffer=self.manager.list()
-
- #Handler erstellen, der in den Buffer schreibt
- self.handler = h = MyHandler(self.buffer)
- self.logger =l= logging.getLogger()
-
- #Level anpassen
- l.setLevel(logging.DEBUG)
- h.setLevel(logging.DEBUG)
- l.addHandler(h)
-
- def tearDownLogging(self):
- '''crazy logging hacks wieder entfernen'''
- self.logger.removeHandler(self.handler)
- self.logger.setLevel(logging.NOTSET)
- self.handler.close()
-
-
- def testJob(self):
- '''einen Job verarbeiten'''
- job=self.manager.Job(None,None,"test")
- self.assertEqual(job.getStatus(),("init",{}))
- self.queue.put(job)
- sleep(.1)
- self.stop()
- self.assertEqual(job.getStatus(),("started",{}))
- self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
- [(20,'Workerprocess läuft nun...'),
- (20,'ein neuer Job(1)'),
- (20,'Job(1) fertig ;)')])
-
- def testBadJob(self):
- '''einen Job verarbeiten, der fehlschlägt'''
- job=self.manager.BadJob(None,None,"test")
- self.assertEqual(job.getStatus(),("init",{}))
- self.queue.put(job)
- sleep(.1)
- self.stop()
- self.assertEqual(job.getStatus(),("error",{}))
- self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
- [(20,'Workerprocess läuft nun...'),
- (20,'ein neuer Job(1)'),
- (40,'Job(1) fehlgeschlagen :(')])
- error=Exception('Error')
- self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
- self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
-
- def testSMSJob(self):
- job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
- self.assertEqual(job.getStatus(),("init",{}))
- self.queue.put(job)
- sleep(.1)
- self.stop()
- self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
- self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
- [(20,'Workerprocess läuft nun...'),
- (20,'ein neuer Job(1)'),
- (20,'Job(1) fertig ;)')])
-
-
-if __name__ == "__main__":
- unittest.main()
--- a/tests/testXMLRPCServer.py Tue Jan 10 06:07:25 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,71 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-
-import xmlrpclib
-from stopableServer import init_server
-
-class TestServer(unittest.TestCase):
-
- def setUp(self):
- self.i = init_server()
- self.serv=self.i.serv
-
- self.serv.start()
-
- def tearDown(self):
- self.serv.stop()
-
- def testLogin(self):
- self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
- self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
- self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
- self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
-
- def testsendSMS(self):
- servstr="https://test:test@localhost:8000"
- client=xmlrpclib.Server(servstr)
- id=client.startSMS("test",["01234", ] )
- self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
-
- def testTwoUser(self):
- u1="https://test:test@localhost:8000"
- u2="https://test2:test2@localhost:8000"
- admin="https://admin:admin@localhost:8000"
- client1=xmlrpclib.Server(u1)
- client2=xmlrpclib.Server(u2)
- admin=xmlrpclib.Server(admin)
- id1=client1.startSMS("test",["01234"] )
- self.assertEqual(client2.status(),{} )
- self.assertEqual(admin.status(id1),{id1: {'status': ['init', {}], 'name': 'test'}} )
- id2=client2.startSMS("test2",["01234"] )
- self.assertNotEqual(id1, id2)
- self.assertEqual(client1.status(),{id1: {'status': ['init', {}], 'name': 'test'}})
- self.assertEqual(client2.status(),{id2: {'status': ['init', {}], 'name': 'test2'}})
- self.assertEqual(admin.status(),{id1: {'status': ['init', {}], 'name': 'test'},
- id2: {'status': ['init', {}], 'name': 'test2'}} )
-
- self.assertEqual(client2.status(id1), {})
- self.assertEqual(client1.status(id2), {})
-
- def testGetProvider(self):
- servstr="https://test:test@localhost:8000"
- client=xmlrpclib.Server(servstr)
- self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"])
- self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"])
- self.assertEqual(client.getProvider("mail"), ["localhost"])
-
- self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp")
-
- def testGetDefault(self):
- servstr="https://test:test@localhost:8000"
- client=xmlrpclib.Server(servstr)
- self.assertEqual(client.getDefaultProvider("sms"), "smstrade")
- self.assertEqual(client.getDefaultProvider("fax"),"sipgate")
- self.assertEqual(client.getDefaultProvider("mail"), "localhost")
-
- self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp")
-
-
-if __name__ == "__main__":
- unittest.main()
--- a/tests/testloglock.py Tue Jan 10 06:07:25 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,98 +0,0 @@
-
-import unittest
-import os
-import tempfile
-import signal
-import threading
-import time
-
-class testLogLock(unittest.TestCase):
- def test_ThreadingAndLocks(self):
- #create a thread, have that thread grab a lock, and sleep.
- fd, file = tempfile.mkstemp('.nonlog')
- _lock = threading.RLock()
- os.close(fd)
- def locker():
- os.write(fd, 'Thread acquiring lock\n')
- _lock.acquire()
- os.write(fd, 'Thread acquired lock\n')
- time.sleep(0.4)
- os.write(fd, 'Thread releasing lock\n')
- _lock.release()
- os.write(fd, 'Thread released lock\n')
-
- #Then in the main thread, throw a signal to self
- def handleSignal(sigNum, frame):
- os.write(fd, 'Main Thread acquiring lock\n')
- lock = False
- endtime = time.time() + 1.0
- while not lock:
- lock = _lock.acquire(blocking=0)
- time.sleep(0.01)
- if time.time() > endtime:
- break
- if not lock:
- os.write(fd, 'Main Thread could not acquire lock\n')
- return
- os.write(fd, 'Main Thread acquired lock\n')
- os.write(fd, 'Main Thread releasing lock\n')
- _lock.release()
- os.write(fd, 'Main Thread released lock\n')
-
- sighndlr = signal.signal(signal.SIGUSR1, handleSignal)
- try:
- fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
- thread = threading.Thread(target=locker)
- thread.start()
- time.sleep(0.1)
- os.kill(os.getpid(), signal.SIGUSR1)
- thread.join()
-
- #check the results
- os.close(fd)
- fileconts = open(file, 'r').read()
- self.assertTrue('Main Thread released lock' in fileconts)
- self.assertEqual(fileconts,
- '''Thread acquiring lock
-Thread acquired lock
-Main Thread acquiring lock
-Thread releasing lock
-Thread released lock
-Main Thread acquired lock
-Main Thread releasing lock
-Main Thread released lock
-''')
-
- #Now try after acquiring the lock from the main thread
- fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
- _lock.acquire()
- thread = threading.Thread(target=locker)
- thread.start()
- time.sleep(0.1)
- os.kill(os.getpid(), signal.SIGUSR1)
- _lock.release()
- thread.join()
- os.close(fd)
- fileconts = open(file, 'r').read()
- self.assertEqual(fileconts,
- '''Thread acquiring lock
-Main Thread acquiring lock
-Main Thread acquired lock
-Main Thread releasing lock
-Main Thread released lock
-Thread acquired lock
-Thread releasing lock
-Thread released lock
-''')
-
- finally:
- signal.signal(signal.SIGUSR1, sighndlr)
- try:
- os.close(fd)
- except OSError:
- pass
- try:
- os.unlink(file)
- except OSError:
- pass
-