iro/tests/testWorker.py
changeset 40 5d177c9d7fd2
child 41 02e9b54ef4f0
equal deleted inserted replaced
39:31a2b1cd4981 40:5d177c9d7fd2
       
     1 # -*- coding: utf-8 -*-
       
     2 
       
     3 import unittest
       
     4 import logging
       
     5 from time import sleep
       
     6 
       
     7 from multiprocessing import Queue
       
     8 from multiprocessing.managers import BaseManager
       
     9 import iro
       
    10 from ..worker import Worker
       
    11 from ..job import Job
       
    12 
       
    13 from logging.handlers import BufferingHandler
       
    14 
       
    15 class MyHandler(BufferingHandler):
       
    16     def __init__(self):
       
    17         '''BufferingHandler takes a "capacity" argument
       
    18         so as to know when to flush. As we're overriding
       
    19         shouldFlush anyway, we can set a capacity of zero.
       
    20         You can call flush() manually to clear out the
       
    21         buffer.'''
       
    22         BufferingHandler.__init__(self, 0)
       
    23 
       
    24     def shouldFlush(self):
       
    25         return False
       
    26 
       
    27     def emit(self, record):
       
    28         self.buffer.append(record.__dict__)
       
    29 
       
    30 class MyManager(BaseManager):
       
    31     pass
       
    32 
       
    33 MyManager.register('Job', Job) 
       
    34     
       
    35 
       
    36 class TestWorker(unittest.TestCase):
       
    37     def setUp(self):
       
    38         self.handler = h = MyHandler()
       
    39         self.logger = l = logging.getLogger()
       
    40         l.addHandler(h)
       
    41 
       
    42         self.manager = MyManager()
       
    43         self.manager.start()
       
    44         self.queue = Queue()
       
    45         self.worker=Worker(self.queue)
       
    46         self.worker.start()
       
    47         sleep(.5)
       
    48         
       
    49 
       
    50     def TearDown(self):
       
    51         self.manager.stop()
       
    52         self.queue.close()
       
    53         self.worker.terminate()
       
    54         self.logger.removeHandler(self.handler)
       
    55         self.handler.close()
       
    56     
       
    57     def testJob(self):
       
    58         job=Job(None,None,"test")
       
    59         self.assertEqual(job.status,"init")
       
    60         self.queue.put(job)
       
    61         sleep(.5)
       
    62         logging.debug("test")
       
    63         print self.handler.buffer
       
    64         self.assertEqual(job.status,"started")
       
    65 
       
    66 if __name__ == "__main__":
       
    67     unittest.main()