diff -r 265124610789 -r fea4c6760ca5 iro/tests/testWorker.py --- a/iro/tests/testWorker.py Sun Dec 18 16:43:22 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,147 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest -import logging -from time import sleep - -from multiprocessing import Queue -from multiprocessing.managers import BaseManager, ListProxy - -from iro.worker import Worker -from iro.job import Job, SMSJob -from iro.anbieter.anbieter import anbieter -from iro.anbieter.content import SMS -from iro.providerlist import Providerlist - -from logging.handlers import BufferingHandler - -class MyHandler(BufferingHandler): - def __init__(self,buffer=None): - '''BufferingHandler takes a "capacity" argument - so as to know when to flush. As we're overriding - shouldFlush anyway, we can set a capacity of zero. - You can call flush() manually to clear out the - buffer. - buffer: log messages to this buffer, needed f.ex for processes or threads''' - BufferingHandler.__init__(self, 0) - self.buffer=buffer - - def shouldFlush(self): - return False - - def emit(self, record): - if record.exc_info: #sonst geht das append schief, weil nicht picklebar - record.exc_info=record.exc_info[:-1] - self.buffer.append(record.__dict__) - - -class BadJob(Job): - def start(self,id=None): - Job.start(self,id) - raise Exception("Error") - -#einen Manager anlegen, der Job und eine Liste anbietet -class MyManager(BaseManager): - pass -MyManager.register('Job', Job) -MyManager.register('SMSJob', SMSJob) -MyManager.register('BadJob', BadJob) -MyManager.register('list', list, ListProxy) -MyManager.register('Providerlist',Providerlist) - -class TestWorker(unittest.TestCase): - def setUp(self): - #erstelle eine Queue&Manager - self.manager = MyManager() - self.queue = Queue() - - #manager starten, damit wir Logging anpassen können - self.manager.start() - self.setUpLogging() - - self.providerlist=self.manager.Providerlist() - self.providerlist.add("test", anbieter() , ["sms", ]) - - #eigentlich Workerprocess starten - self.worker= Worker(self.queue) - self.worker.start() - - def tearDown(self): - #Thread&Queue stoppen - self.stop() - - #Logging änderungen rückgängig - self.tearDownLogging() - self.manager.shutdown() - - def stop(self): - self.queue.close() - self.queue.join_thread() - self.worker.terminate() - - - def setUpLogging(self): - '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' - #wir brauchen eine threadsichere liste - self.buffer=self.manager.list() - - #Handler erstellen, der in den Buffer schreibt - self.handler = h = MyHandler(self.buffer) - self.logger =l= logging.getLogger() - - #Level anpassen - l.setLevel(logging.DEBUG) - h.setLevel(logging.DEBUG) - l.addHandler(h) - - def tearDownLogging(self): - '''crazy logging hacks wieder entfernen''' - self.logger.removeHandler(self.handler) - self.logger.setLevel(logging.NOTSET) - self.handler.close() - - - def testJob(self): - '''einen Job verarbeiten''' - job=self.manager.Job(None,None,"test") - self.assertEqual(job.getStatus(),("init",{})) - self.queue.put(job) - sleep(.1) - self.stop() - self.assertEqual(job.getStatus(),("started",{})) - self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], - [(20,'Workerprocess läuft nun...'), - (20,'ein neuer Job(1)'), - (20,'Job(1) fertig ;)')]) - - def testBadJob(self): - '''einen Job verarbeiten, der fehlschlägt''' - job=self.manager.BadJob(None,None,"test") - self.assertEqual(job.getStatus(),("init",{})) - self.queue.put(job) - sleep(.1) - self.stop() - self.assertEqual(job.getStatus(),("error",{})) - self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], - [(20,'Workerprocess läuft nun...'), - (20,'ein neuer Job(1)'), - (40,'Job(1) fehlgeschlagen :(')]) - error=Exception('Error') - self.assertEqual(self.buffer[-1]['exc_info'][0],type(error)) - self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error)) - - def testSMSJob(self): - job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345]) - self.assertEqual(job.getStatus(),("init",{})) - self.queue.put(job) - sleep(.1) - self.stop() - self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []})) - self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], - [(20,'Workerprocess läuft nun...'), - (20,'ein neuer Job(1)'), - (20,'Job(1) fertig ;)')]) - - -if __name__ == "__main__": - unittest.main()