iro/tests/testWorker.py
author Sandro Knauß <knauss@netzguerilla.net>
Tue, 05 Oct 2010 03:24:01 +0200
changeset 40 5d177c9d7fd2
child 41 02e9b54ef4f0
permissions -rw-r--r--
testWorker.py hinzugefügt

# -*- coding: utf-8 -*-

import unittest
import logging
from time import sleep

from multiprocessing import Queue
from multiprocessing.managers import BaseManager
import iro
from ..worker import Worker
from ..job import Job

from logging.handlers import BufferingHandler

class MyHandler(BufferingHandler):
    def __init__(self):
        '''BufferingHandler takes a "capacity" argument
        so as to know when to flush. As we're overriding
        shouldFlush anyway, we can set a capacity of zero.
        You can call flush() manually to clear out the
        buffer.'''
        BufferingHandler.__init__(self, 0)

    def shouldFlush(self):
        return False

    def emit(self, record):
        self.buffer.append(record.__dict__)

class MyManager(BaseManager):
    pass

MyManager.register('Job', Job) 
    

class TestWorker(unittest.TestCase):
    def setUp(self):
        self.handler = h = MyHandler()
        self.logger = l = logging.getLogger()
        l.addHandler(h)

        self.manager = MyManager()
        self.manager.start()
        self.queue = Queue()
        self.worker=Worker(self.queue)
        self.worker.start()
        sleep(.5)
        

    def TearDown(self):
        self.manager.stop()
        self.queue.close()
        self.worker.terminate()
        self.logger.removeHandler(self.handler)
        self.handler.close()
    
    def testJob(self):
        job=Job(None,None,"test")
        self.assertEqual(job.status,"init")
        self.queue.put(job)
        sleep(.5)
        logging.debug("test")
        print self.handler.buffer
        self.assertEqual(job.status,"started")

if __name__ == "__main__":
    unittest.main()