diff -r 265124610789 -r fea4c6760ca5 tests/testWorker.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/testWorker.py Sun Dec 18 18:03:45 2011 +0100 @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- + +import unittest +import logging +from time import sleep + +from multiprocessing import Queue +from multiprocessing.managers import BaseManager, ListProxy + +from iro.worker import Worker +from iro.job import Job, SMSJob +from iro.anbieter.anbieter import anbieter +from iro.anbieter.content import SMS +from iro.providerlist import Providerlist + +from logging.handlers import BufferingHandler + +class MyHandler(BufferingHandler): + def __init__(self,buffer=None): + '''BufferingHandler takes a "capacity" argument + so as to know when to flush. As we're overriding + shouldFlush anyway, we can set a capacity of zero. + You can call flush() manually to clear out the + buffer. + buffer: log messages to this buffer, needed f.ex for processes or threads''' + BufferingHandler.__init__(self, 0) + self.buffer=buffer + + def shouldFlush(self): + return False + + def emit(self, record): + if record.exc_info: #sonst geht das append schief, weil nicht picklebar + record.exc_info=record.exc_info[:-1] + self.buffer.append(record.__dict__) + + +class BadJob(Job): + def start(self,id=None): + Job.start(self,id) + raise Exception("Error") + +#einen Manager anlegen, der Job und eine Liste anbietet +class MyManager(BaseManager): + pass +MyManager.register('Job', Job) +MyManager.register('SMSJob', SMSJob) +MyManager.register('BadJob', BadJob) +MyManager.register('list', list, ListProxy) +MyManager.register('Providerlist',Providerlist) + +class TestWorker(unittest.TestCase): + def setUp(self): + #erstelle eine Queue&Manager + self.manager = MyManager() + self.queue = Queue() + + #manager starten, damit wir Logging anpassen können + self.manager.start() + self.setUpLogging() + + self.providerlist=self.manager.Providerlist() + self.providerlist.add("test", anbieter() , ["sms", ]) + + #eigentlich Workerprocess starten + self.worker= Worker(self.queue) + self.worker.start() + + def tearDown(self): + #Thread&Queue stoppen + self.stop() + + #Logging änderungen rückgängig + self.tearDownLogging() + self.manager.shutdown() + + def stop(self): + self.queue.close() + self.queue.join_thread() + self.worker.terminate() + + + def setUpLogging(self): + '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' + #wir brauchen eine threadsichere liste + self.buffer=self.manager.list() + + #Handler erstellen, der in den Buffer schreibt + self.handler = h = MyHandler(self.buffer) + self.logger =l= logging.getLogger() + + #Level anpassen + l.setLevel(logging.DEBUG) + h.setLevel(logging.DEBUG) + l.addHandler(h) + + def tearDownLogging(self): + '''crazy logging hacks wieder entfernen''' + self.logger.removeHandler(self.handler) + self.logger.setLevel(logging.NOTSET) + self.handler.close() + + + def testJob(self): + '''einen Job verarbeiten''' + job=self.manager.Job(None,None,"test") + self.assertEqual(job.getStatus(),("init",{})) + self.queue.put(job) + sleep(.1) + self.stop() + self.assertEqual(job.getStatus(),("started",{})) + self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], + [(20,'Workerprocess läuft nun...'), + (20,'ein neuer Job(1)'), + (20,'Job(1) fertig ;)')]) + + def testBadJob(self): + '''einen Job verarbeiten, der fehlschlägt''' + job=self.manager.BadJob(None,None,"test") + self.assertEqual(job.getStatus(),("init",{})) + self.queue.put(job) + sleep(.1) + self.stop() + self.assertEqual(job.getStatus(),("error",{})) + self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], + [(20,'Workerprocess läuft nun...'), + (20,'ein neuer Job(1)'), + (40,'Job(1) fehlgeschlagen :(')]) + error=Exception('Error') + self.assertEqual(self.buffer[-1]['exc_info'][0],type(error)) + self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error)) + + def testSMSJob(self): + job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345]) + self.assertEqual(job.getStatus(),("init",{})) + self.queue.put(job) + sleep(.1) + self.stop() + self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []})) + self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], + [(20,'Workerprocess läuft nun...'), + (20,'ein neuer Job(1)'), + (20,'Job(1) fertig ;)')]) + + +if __name__ == "__main__": + unittest.main()