tests/testWorker.py
author Sandro Knauß <knauss@netzguerilla.net>
Thu, 22 Dec 2011 03:13:34 +0100
branchdevel
changeset 92 f479738b4879
parent 81 fea4c6760ca5
permissions -rw-r--r--
umbau zu MVC

# -*- coding: utf-8 -*-

import unittest
import logging
from time import sleep

from multiprocessing import Queue
from multiprocessing.managers import BaseManager, ListProxy

from iro.worker import Worker
from iro.job import Job, SMSJob
from iro.anbieter.anbieter import anbieter
from iro.anbieter.content import SMS
from iro.providerlist import Providerlist

from logging.handlers import BufferingHandler

class MyHandler(BufferingHandler):
    def __init__(self,buffer=None):
        '''BufferingHandler takes a "capacity" argument
        so as to know when to flush. As we're overriding
        shouldFlush anyway, we can set a capacity of zero.
        You can call flush() manually to clear out the
        buffer.
            buffer: log messages to this buffer, needed f.ex for processes or threads'''
        BufferingHandler.__init__(self, 0)
        self.buffer=buffer

    def shouldFlush(self):
        return False

    def emit(self, record):
        if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
            record.exc_info=record.exc_info[:-1]                                              
        self.buffer.append(record.__dict__)


class BadJob(Job):
    def start(self,id=None):
        Job.start(self,id)
        raise Exception("Error")

#einen Manager anlegen, der Job und eine Liste anbietet
class MyManager(BaseManager):
    pass
MyManager.register('Job', Job) 
MyManager.register('SMSJob', SMSJob) 
MyManager.register('BadJob', BadJob) 
MyManager.register('list', list, ListProxy)
MyManager.register('Providerlist',Providerlist) 

class TestWorker(unittest.TestCase):
    def setUp(self):
        #erstelle eine Queue&Manager
        self.manager = MyManager()
        self.queue = Queue()
        
        #manager starten, damit wir Logging anpassen können
        self.manager.start()
        self.setUpLogging()

        self.providerlist=self.manager.Providerlist()
        self.providerlist.add("test", anbieter() , ["sms",  ])
        
        #eigentlich Workerprocess starten
        self.worker= Worker(self.queue)
        self.worker.start()

    def tearDown(self):
        #Thread&Queue stoppen
        self.stop()
        
        #Logging änderungen rückgängig
        self.tearDownLogging()
        self.manager.shutdown()
    
    def stop(self):
        self.queue.close()
        self.queue.join_thread()
        self.worker.terminate()


    def setUpLogging(self):
        '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
        #wir brauchen eine threadsichere liste
        self.buffer=self.manager.list()
        
        #Handler erstellen, der in den Buffer schreibt
        self.handler = h = MyHandler(self.buffer)
        self.logger =l= logging.getLogger()
        
        #Level anpassen 
        l.setLevel(logging.DEBUG)
        h.setLevel(logging.DEBUG)
        l.addHandler(h)

    def tearDownLogging(self):
        '''crazy logging hacks wieder entfernen'''
        self.logger.removeHandler(self.handler)
        self.logger.setLevel(logging.NOTSET)
        self.handler.close()


    def testJob(self):
        '''einen Job verarbeiten'''
        job=self.manager.Job(None,None,"test")
        self.assertEqual(job.getStatus(),("init",{}))
        self.queue.put(job)
        sleep(.1)
        self.stop()
        self.assertEqual(job.getStatus(),("started",{}))
        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
                [(20,'Workerprocess läuft nun...'),
                 (20,'ein neuer Job(1)'),
                 (20,'Job(1) fertig ;)')])

    def testBadJob(self):
        '''einen Job verarbeiten, der fehlschlägt'''
        job=self.manager.BadJob(None,None,"test")
        self.assertEqual(job.getStatus(),("init",{}))
        self.queue.put(job)
        sleep(.1)
        self.stop()
        self.assertEqual(job.getStatus(),("error",{}))
        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
                [(20,'Workerprocess läuft nun...'),
                 (20,'ein neuer Job(1)'),
                 (40,'Job(1) fehlgeschlagen :(')])
        error=Exception('Error')
        self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
        self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))

    def testSMSJob(self):
        job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
        self.assertEqual(job.getStatus(),("init",{}))
        self.queue.put(job)
        sleep(.1)
        self.stop()
        self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
                [(20,'Workerprocess läuft nun...'),
                 (20,'ein neuer Job(1)'),
                 (20,'Job(1) fertig ;)')])


if __name__ == "__main__":
    unittest.main()