# HG changeset patch # User Sandro Knauß # Date 1286332099 -7200 # Node ID 02e9b54ef4f00a9a5feccdf0a5fde20bde49ac26 # Parent 5d177c9d7fd2f0c7b41ad617a98413a9426cf482 testWorker benutzbar machen & test laufen lassen diff -r 5d177c9d7fd2 -r 02e9b54ef4f0 iro/tests/testWorker.py --- a/iro/tests/testWorker.py Tue Oct 05 03:24:01 2010 +0200 +++ b/iro/tests/testWorker.py Wed Oct 06 04:28:19 2010 +0200 @@ -5,63 +5,100 @@ from time import sleep from multiprocessing import Queue -from multiprocessing.managers import BaseManager -import iro -from ..worker import Worker -from ..job import Job +from multiprocessing.managers import BaseManager,ListProxy + +from iro.worker import Worker +from iro.job import Job from logging.handlers import BufferingHandler class MyHandler(BufferingHandler): - def __init__(self): + def __init__(self,buffer=None): '''BufferingHandler takes a "capacity" argument so as to know when to flush. As we're overriding shouldFlush anyway, we can set a capacity of zero. You can call flush() manually to clear out the - buffer.''' + buffer. + buffer: log messages to this buffer, needed f.ex for processes or threads''' BufferingHandler.__init__(self, 0) + self.buffer=buffer def shouldFlush(self): return False def emit(self, record): + if record.exc_info: #sonst geht das append schief, weil nicht picklebar + print record.exc_info + record.exc_info=record.exc_info[:-1] self.buffer.append(record.__dict__) class MyManager(BaseManager): pass - MyManager.register('Job', Job) - +MyManager.register('list', list, ListProxy) class TestWorker(unittest.TestCase): def setUp(self): - self.handler = h = MyHandler() - self.logger = l = logging.getLogger() + #erstelle eine Queue&Manager + self.manager = MyManager() + self.queue = Queue() + + #manager starten, damit wir Logging anpassen können + self.manager.start() + self.setUpLogging() + + #eigentlich Workerprocess starten + self.worker= Worker(self.queue) + self.worker.start() + + def tearDown(self): + #Thread&Queue stoppen + self.stop() + + #Logging änderungen rückgängig + self.tearDownLogging() + self.manager.shutdown() + + def stop(self): + self.queue.close() + self.queue.join_thread() + self.worker.terminate() + + + def setUpLogging(self): + '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' + #wir brauchen eine threadsichere liste + self.buffer=self.manager.list() + + #Handler erstellen, der in den Buffer schreibt + self.handler = h = MyHandler(self.buffer) + self.logger =l= logging.getLogger() + + #Level anpassen + l.setLevel(logging.DEBUG) + h.setLevel(logging.DEBUG) l.addHandler(h) - self.manager = MyManager() - self.manager.start() - self.queue = Queue() - self.worker=Worker(self.queue) - self.worker.start() - sleep(.5) - + def tearDownLogging(self): + '''crazy logging hacks wieder entfernen''' + self.logger.removeHandler(self.handler) + self.logger.setLevel(logging.NOTSET) + self.handler.close() + - def TearDown(self): - self.manager.stop() - self.queue.close() - self.worker.terminate() - self.logger.removeHandler(self.handler) - self.handler.close() - def testJob(self): - job=Job(None,None,"test") - self.assertEqual(job.status,"init") + '''einen Job verarbeiten''' + job=self.manager.Job(None,None,"test") + self.assertEqual(job.getStatus(),"init") self.queue.put(job) - sleep(.5) - logging.debug("test") - print self.handler.buffer - self.assertEqual(job.status,"started") + sleep(.1) + self.stop() + self.assertEqual(job.getStatus(),"started") + self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], + [(20,'Workerprocess läuft nun...'), + (20,'ein neuer Job(1)'), + (20,'Job(1) fertig ;)')]) + if __name__ == "__main__": unittest.main() diff -r 5d177c9d7fd2 -r 02e9b54ef4f0 iro/worker.py --- a/iro/worker.py Tue Oct 05 03:24:01 2010 +0200 +++ b/iro/worker.py Wed Oct 06 04:28:19 2010 +0200 @@ -4,7 +4,6 @@ from multiprocessing import Process import logging logger = logging.getLogger("iro.worker") -import time class Worker(Process): def __init__(self,queue): @@ -12,7 +11,7 @@ self.queue=queue def run(self): - logger.info('Worker thread läuft nun...') + logger.info('Workerprocess läuft nun...') id=0 while 1: job=self.queue.get() @@ -21,7 +20,8 @@ id+=1 logger.info('ein neuer Job(%d)' %(id)) try: - job.start(id) + job.start() + logger.info('Job(%d) fertig ;)'%(id)) except: logger.exception('Job(%d) fehlgeschlagen :('%(id))