iro/tests/testWorker.py
changeset 41 02e9b54ef4f0
parent 40 5d177c9d7fd2
child 42 1a9f191389a4
equal deleted inserted replaced
40:5d177c9d7fd2 41:02e9b54ef4f0
     3 import unittest
     3 import unittest
     4 import logging
     4 import logging
     5 from time import sleep
     5 from time import sleep
     6 
     6 
     7 from multiprocessing import Queue
     7 from multiprocessing import Queue
     8 from multiprocessing.managers import BaseManager
     8 from multiprocessing.managers import BaseManager,ListProxy
     9 import iro
     9 
    10 from ..worker import Worker
    10 from iro.worker import Worker
    11 from ..job import Job
    11 from iro.job import Job
    12 
    12 
    13 from logging.handlers import BufferingHandler
    13 from logging.handlers import BufferingHandler
    14 
    14 
    15 class MyHandler(BufferingHandler):
    15 class MyHandler(BufferingHandler):
    16     def __init__(self):
    16     def __init__(self,buffer=None):
    17         '''BufferingHandler takes a "capacity" argument
    17         '''BufferingHandler takes a "capacity" argument
    18         so as to know when to flush. As we're overriding
    18         so as to know when to flush. As we're overriding
    19         shouldFlush anyway, we can set a capacity of zero.
    19         shouldFlush anyway, we can set a capacity of zero.
    20         You can call flush() manually to clear out the
    20         You can call flush() manually to clear out the
    21         buffer.'''
    21         buffer.
       
    22             buffer: log messages to this buffer, needed f.ex for processes or threads'''
    22         BufferingHandler.__init__(self, 0)
    23         BufferingHandler.__init__(self, 0)
       
    24         self.buffer=buffer
    23 
    25 
    24     def shouldFlush(self):
    26     def shouldFlush(self):
    25         return False
    27         return False
    26 
    28 
    27     def emit(self, record):
    29     def emit(self, record):
       
    30         if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
       
    31             print record.exc_info
       
    32             record.exc_info=record.exc_info[:-1]                                              
    28         self.buffer.append(record.__dict__)
    33         self.buffer.append(record.__dict__)
    29 
    34 
    30 class MyManager(BaseManager):
    35 class MyManager(BaseManager):
    31     pass
    36     pass
    32 
       
    33 MyManager.register('Job', Job) 
    37 MyManager.register('Job', Job) 
    34     
    38 MyManager.register('list', list, ListProxy)
    35 
    39 
    36 class TestWorker(unittest.TestCase):
    40 class TestWorker(unittest.TestCase):
    37     def setUp(self):
    41     def setUp(self):
    38         self.handler = h = MyHandler()
    42         #erstelle eine Queue&Manager
    39         self.logger = l = logging.getLogger()
    43         self.manager = MyManager()
       
    44         self.queue = Queue()
       
    45         
       
    46         #manager starten, damit wir Logging anpassen können
       
    47         self.manager.start()
       
    48         self.setUpLogging()
       
    49 
       
    50         #eigentlich Workerprocess starten
       
    51         self.worker= Worker(self.queue)
       
    52         self.worker.start()
       
    53 
       
    54     def tearDown(self):
       
    55         #Thread&Queue stoppen
       
    56         self.stop()
       
    57         
       
    58         #Logging änderungen rückgängig
       
    59         self.tearDownLogging()
       
    60         self.manager.shutdown()
       
    61     
       
    62     def stop(self):
       
    63         self.queue.close()
       
    64         self.queue.join_thread()
       
    65         self.worker.terminate()
       
    66 
       
    67 
       
    68     def setUpLogging(self):
       
    69         '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
       
    70         #wir brauchen eine threadsichere liste
       
    71         self.buffer=self.manager.list()
       
    72         
       
    73         #Handler erstellen, der in den Buffer schreibt
       
    74         self.handler = h = MyHandler(self.buffer)
       
    75         self.logger =l= logging.getLogger()
       
    76         
       
    77         #Level anpassen 
       
    78         l.setLevel(logging.DEBUG)
       
    79         h.setLevel(logging.DEBUG)
    40         l.addHandler(h)
    80         l.addHandler(h)
    41 
    81 
    42         self.manager = MyManager()
    82     def tearDownLogging(self):
    43         self.manager.start()
    83         '''crazy logging hacks wieder entfernen'''
    44         self.queue = Queue()
    84         self.logger.removeHandler(self.handler)
    45         self.worker=Worker(self.queue)
    85         self.logger.setLevel(logging.NOTSET)
    46         self.worker.start()
    86         self.handler.close()
    47         sleep(.5)
       
    48         
       
    49 
    87 
    50     def TearDown(self):
    88 
    51         self.manager.stop()
       
    52         self.queue.close()
       
    53         self.worker.terminate()
       
    54         self.logger.removeHandler(self.handler)
       
    55         self.handler.close()
       
    56     
       
    57     def testJob(self):
    89     def testJob(self):
    58         job=Job(None,None,"test")
    90         '''einen Job verarbeiten'''
    59         self.assertEqual(job.status,"init")
    91         job=self.manager.Job(None,None,"test")
       
    92         self.assertEqual(job.getStatus(),"init")
    60         self.queue.put(job)
    93         self.queue.put(job)
    61         sleep(.5)
    94         sleep(.1)
    62         logging.debug("test")
    95         self.stop()
    63         print self.handler.buffer
    96         self.assertEqual(job.getStatus(),"started")
    64         self.assertEqual(job.status,"started")
    97         self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
       
    98                 [(20,'Workerprocess läuft nun...'),
       
    99                  (20,'ein neuer Job(1)'),
       
   100                  (20,'Job(1) fertig ;)')])
       
   101 
    65 
   102 
    66 if __name__ == "__main__":
   103 if __name__ == "__main__":
    67     unittest.main()  
   104     unittest.main()