iro/tests/testWorker.py
changeset 45 4bde195af39f
parent 42 1a9f191389a4
child 48 32763e344d3b
equal deleted inserted replaced
44:e20909e61588 45:4bde195af39f
       
     1 # -*- coding: utf-8 -*-
       
     2 
       
     3 import unittest
       
     4 import logging
       
     5 from time import sleep
       
     6 
       
     7 from multiprocessing import Queue
       
     8 from multiprocessing.managers import BaseManager,ListProxy
       
     9 
       
    10 from iro.worker import Worker
       
    11 from iro.job import Job
       
    12 
       
    13 from logging.handlers import BufferingHandler
       
    14 
       
    15 class MyHandler(BufferingHandler):
       
    16     def __init__(self,buffer=None):
       
    17         '''BufferingHandler takes a "capacity" argument
       
    18         so as to know when to flush. As we're overriding
       
    19         shouldFlush anyway, we can set a capacity of zero.
       
    20         You can call flush() manually to clear out the
       
    21         buffer.
       
    22             buffer: log messages to this buffer, needed f.ex for processes or threads'''
       
    23         BufferingHandler.__init__(self, 0)
       
    24         self.buffer=buffer
       
    25 
       
    26     def shouldFlush(self):
       
    27         return False
       
    28 
       
    29     def emit(self, record):
       
    30         if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
       
    31             print record.exc_info
       
    32             record.exc_info=record.exc_info[:-1]                                              
       
    33         self.buffer.append(record.__dict__)
       
    34 
       
    35 
       
    36 class BadJob(Job):
       
    37     def start(self):
       
    38         Job.start(self)
       
    39         raise Exception("Error")
       
    40 
       
    41 #einen Manager anlegen, der Job und eine Liste anbietet
       
    42 class MyManager(BaseManager):
       
    43     pass
       
    44 MyManager.register('Job', Job) 
       
    45 MyManager.register('BadJob', BadJob) 
       
    46 MyManager.register('list', list, ListProxy)
       
    47 
       
    48 class TestWorker(unittest.TestCase):
       
    49     def setUp(self):
       
    50         #erstelle eine Queue&Manager
       
    51         self.manager = MyManager()
       
    52         self.queue = Queue()
       
    53         
       
    54         #manager starten, damit wir Logging anpassen können
       
    55         self.manager.start()
       
    56         self.setUpLogging()
       
    57 
       
    58         #eigentlich Workerprocess starten
       
    59         self.worker= Worker(self.queue)
       
    60         self.worker.start()
       
    61 
       
    62     def tearDown(self):
       
    63         #Thread&Queue stoppen
       
    64         self.stop()
       
    65         
       
    66         #Logging änderungen rückgängig
       
    67         self.tearDownLogging()
       
    68         self.manager.shutdown()
       
    69     
       
    70     def stop(self):
       
    71         self.queue.close()
       
    72         self.queue.join_thread()
       
    73         self.worker.terminate()
       
    74 
       
    75 
       
    76     def setUpLogging(self):
       
    77         '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
       
    78         #wir brauchen eine threadsichere liste
       
    79         self.buffer=self.manager.list()
       
    80         
       
    81         #Handler erstellen, der in den Buffer schreibt
       
    82         self.handler = h = MyHandler(self.buffer)
       
    83         self.logger =l= logging.getLogger()
       
    84         
       
    85         #Level anpassen 
       
    86         l.setLevel(logging.DEBUG)
       
    87         h.setLevel(logging.DEBUG)
       
    88         l.addHandler(h)
       
    89 
       
    90     def tearDownLogging(self):
       
    91         '''crazy logging hacks wieder entfernen'''
       
    92         self.logger.removeHandler(self.handler)
       
    93         self.logger.setLevel(logging.NOTSET)
       
    94         self.handler.close()
       
    95 
       
    96 
       
    97     def testJob(self):
       
    98         '''einen Job verarbeiten'''
       
    99         job=self.manager.Job(None,None,"test")
       
   100         self.assertEqual(job.getStatus(),"init")
       
   101         self.queue.put(job)
       
   102         sleep(.1)
       
   103         self.stop()
       
   104         self.assertEqual(job.getStatus(),"started")
       
   105         self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
       
   106                 [(20,'Workerprocess läuft nun...'),
       
   107                  (20,'ein neuer Job(1)'),
       
   108                  (20,'Job(1) fertig ;)')])
       
   109 
       
   110     def testBadJob(self):
       
   111         '''einen Job verarbeiten, der fehlschlägt'''
       
   112         job=self.manager.BadJob(None,None,"test")
       
   113         self.assertEqual(job.getStatus(),"init")
       
   114         self.queue.put(job)
       
   115         sleep(.1)
       
   116         self.stop()
       
   117         self.assertEqual(job.getStatus(),"error")
       
   118         print self.buffer
       
   119         self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
       
   120                 [(20,'Workerprocess läuft nun...'),
       
   121                  (20,'ein neuer Job(1)'),
       
   122                  (40,'Job(1) fehlgeschlagen :(')])
       
   123         error=Exception('Error')
       
   124         self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
       
   125         self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
       
   126 
       
   127 
       
   128 
       
   129 if __name__ == "__main__":
       
   130     unittest.main()