iro/tests/testWorker.py
author Sandro Knauß <knauss@netzguerilla.net>
Wed, 06 Oct 2010 04:28:19 +0200
changeset 41 02e9b54ef4f0
parent 40 5d177c9d7fd2
child 42 1a9f191389a4
permissions -rw-r--r--
testWorker benutzbar machen & test laufen lassen

# -*- coding: utf-8 -*-

import unittest
import logging
from time import sleep

from multiprocessing import Queue
from multiprocessing.managers import BaseManager,ListProxy

from iro.worker import Worker
from iro.job import Job

from logging.handlers import BufferingHandler

class MyHandler(BufferingHandler):
    def __init__(self,buffer=None):
        '''BufferingHandler takes a "capacity" argument
        so as to know when to flush. As we're overriding
        shouldFlush anyway, we can set a capacity of zero.
        You can call flush() manually to clear out the
        buffer.
            buffer: log messages to this buffer, needed f.ex for processes or threads'''
        BufferingHandler.__init__(self, 0)
        self.buffer=buffer

    def shouldFlush(self):
        return False

    def emit(self, record):
        if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
            print record.exc_info
            record.exc_info=record.exc_info[:-1]                                              
        self.buffer.append(record.__dict__)

class MyManager(BaseManager):
    pass
MyManager.register('Job', Job) 
MyManager.register('list', list, ListProxy)

class TestWorker(unittest.TestCase):
    def setUp(self):
        #erstelle eine Queue&Manager
        self.manager = MyManager()
        self.queue = Queue()
        
        #manager starten, damit wir Logging anpassen können
        self.manager.start()
        self.setUpLogging()

        #eigentlich Workerprocess starten
        self.worker= Worker(self.queue)
        self.worker.start()

    def tearDown(self):
        #Thread&Queue stoppen
        self.stop()
        
        #Logging änderungen rückgängig
        self.tearDownLogging()
        self.manager.shutdown()
    
    def stop(self):
        self.queue.close()
        self.queue.join_thread()
        self.worker.terminate()


    def setUpLogging(self):
        '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
        #wir brauchen eine threadsichere liste
        self.buffer=self.manager.list()
        
        #Handler erstellen, der in den Buffer schreibt
        self.handler = h = MyHandler(self.buffer)
        self.logger =l= logging.getLogger()
        
        #Level anpassen 
        l.setLevel(logging.DEBUG)
        h.setLevel(logging.DEBUG)
        l.addHandler(h)

    def tearDownLogging(self):
        '''crazy logging hacks wieder entfernen'''
        self.logger.removeHandler(self.handler)
        self.logger.setLevel(logging.NOTSET)
        self.handler.close()


    def testJob(self):
        '''einen Job verarbeiten'''
        job=self.manager.Job(None,None,"test")
        self.assertEqual(job.getStatus(),"init")
        self.queue.put(job)
        sleep(.1)
        self.stop()
        self.assertEqual(job.getStatus(),"started")
        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
                [(20,'Workerprocess läuft nun...'),
                 (20,'ein neuer Job(1)'),
                 (20,'Job(1) fertig ;)')])


if __name__ == "__main__":
    unittest.main()