# -*- coding: utf-8 -*-
import unittest
import logging
from time import sleep
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
import iro
from ..worker import Worker
from ..job import Job
from logging.handlers import BufferingHandler
class MyHandler(BufferingHandler):
def __init__(self):
'''BufferingHandler takes a "capacity" argument
so as to know when to flush. As we're overriding
shouldFlush anyway, we can set a capacity of zero.
You can call flush() manually to clear out the
buffer.'''
BufferingHandler.__init__(self, 0)
def shouldFlush(self):
return False
def emit(self, record):
self.buffer.append(record.__dict__)
class MyManager(BaseManager):
pass
MyManager.register('Job', Job)
class TestWorker(unittest.TestCase):
def setUp(self):
self.handler = h = MyHandler()
self.logger = l = logging.getLogger()
l.addHandler(h)
self.manager = MyManager()
self.manager.start()
self.queue = Queue()
self.worker=Worker(self.queue)
self.worker.start()
sleep(.5)
def TearDown(self):
self.manager.stop()
self.queue.close()
self.worker.terminate()
self.logger.removeHandler(self.handler)
self.handler.close()
def testJob(self):
job=Job(None,None,"test")
self.assertEqual(job.status,"init")
self.queue.put(job)
sleep(.5)
logging.debug("test")
print self.handler.buffer
self.assertEqual(job.status,"started")
if __name__ == "__main__":
unittest.main()