tests/old/testWorker.py
branchdevel
changeset 232 3f0d2a060e5d
parent 231 3929338fd17f
child 233 34435357dc8a
equal deleted inserted replaced
231:3929338fd17f 232:3f0d2a060e5d
     1 # -*- coding: utf-8 -*-
       
     2 
       
     3 import unittest
       
     4 import logging
       
     5 from time import sleep
       
     6 
       
     7 from multiprocessing import Queue
       
     8 from multiprocessing.managers import BaseManager, ListProxy
       
     9 
       
    10 from iro.worker import Worker
       
    11 from iro.job import Job, SMSJob
       
    12 from iro.anbieter.anbieter import anbieter
       
    13 from iro.anbieter.content import SMS
       
    14 from iro.providerlist import Providerlist
       
    15 
       
    16 from logging.handlers import BufferingHandler
       
    17 
       
    18 class MyHandler(BufferingHandler):
       
    19     def __init__(self,buffer=None):
       
    20         '''BufferingHandler takes a "capacity" argument
       
    21         so as to know when to flush. As we're overriding
       
    22         shouldFlush anyway, we can set a capacity of zero.
       
    23         You can call flush() manually to clear out the
       
    24         buffer.
       
    25             buffer: log messages to this buffer, needed f.ex for processes or threads'''
       
    26         BufferingHandler.__init__(self, 0)
       
    27         self.buffer=buffer
       
    28 
       
    29     def shouldFlush(self):
       
    30         return False
       
    31 
       
    32     def emit(self, record):
       
    33         if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
       
    34             record.exc_info=record.exc_info[:-1]                                              
       
    35         self.buffer.append(record.__dict__)
       
    36 
       
    37 
       
    38 class BadJob(Job):
       
    39     def start(self,id=None):
       
    40         Job.start(self,id)
       
    41         raise Exception("Error")
       
    42 
       
    43 #einen Manager anlegen, der Job und eine Liste anbietet
       
    44 class MyManager(BaseManager):
       
    45     pass
       
    46 MyManager.register('Job', Job) 
       
    47 MyManager.register('SMSJob', SMSJob) 
       
    48 MyManager.register('BadJob', BadJob) 
       
    49 MyManager.register('list', list, ListProxy)
       
    50 MyManager.register('Providerlist',Providerlist) 
       
    51 
       
    52 class TestWorker(unittest.TestCase):
       
    53     def setUp(self):
       
    54         #erstelle eine Queue&Manager
       
    55         self.manager = MyManager()
       
    56         self.queue = Queue()
       
    57         
       
    58         #manager starten, damit wir Logging anpassen können
       
    59         self.manager.start()
       
    60         self.setUpLogging()
       
    61 
       
    62         self.providerlist=self.manager.Providerlist()
       
    63         self.providerlist.add("test", anbieter() , ["sms",  ])
       
    64         
       
    65         #eigentlich Workerprocess starten
       
    66         self.worker= Worker(self.queue)
       
    67         self.worker.start()
       
    68 
       
    69     def tearDown(self):
       
    70         #Thread&Queue stoppen
       
    71         self.stop()
       
    72         
       
    73         #Logging änderungen rückgängig
       
    74         self.tearDownLogging()
       
    75         self.manager.shutdown()
       
    76     
       
    77     def stop(self):
       
    78         self.queue.close()
       
    79         self.queue.join_thread()
       
    80         self.worker.terminate()
       
    81 
       
    82 
       
    83     def setUpLogging(self):
       
    84         '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
       
    85         #wir brauchen eine threadsichere liste
       
    86         self.buffer=self.manager.list()
       
    87         
       
    88         #Handler erstellen, der in den Buffer schreibt
       
    89         self.handler = h = MyHandler(self.buffer)
       
    90         self.logger =l= logging.getLogger()
       
    91         
       
    92         #Level anpassen 
       
    93         l.setLevel(logging.DEBUG)
       
    94         h.setLevel(logging.DEBUG)
       
    95         l.addHandler(h)
       
    96 
       
    97     def tearDownLogging(self):
       
    98         '''crazy logging hacks wieder entfernen'''
       
    99         self.logger.removeHandler(self.handler)
       
   100         self.logger.setLevel(logging.NOTSET)
       
   101         self.handler.close()
       
   102 
       
   103 
       
   104     def testJob(self):
       
   105         '''einen Job verarbeiten'''
       
   106         job=self.manager.Job(None,None,"test")
       
   107         self.assertEqual(job.getStatus(),("init",{}))
       
   108         self.queue.put(job)
       
   109         sleep(.1)
       
   110         self.stop()
       
   111         self.assertEqual(job.getStatus(),("started",{}))
       
   112         self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
       
   113                 [(20,'Workerprocess läuft nun...'),
       
   114                  (20,'ein neuer Job(1)'),
       
   115                  (20,'Job(1) fertig ;)')])
       
   116 
       
   117     def testBadJob(self):
       
   118         '''einen Job verarbeiten, der fehlschlägt'''
       
   119         job=self.manager.BadJob(None,None,"test")
       
   120         self.assertEqual(job.getStatus(),("init",{}))
       
   121         self.queue.put(job)
       
   122         sleep(.1)
       
   123         self.stop()
       
   124         self.assertEqual(job.getStatus(),("error",{}))
       
   125         self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
       
   126                 [(20,'Workerprocess läuft nun...'),
       
   127                  (20,'ein neuer Job(1)'),
       
   128                  (40,'Job(1) fehlgeschlagen :(')])
       
   129         error=Exception('Error')
       
   130         self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
       
   131         self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
       
   132 
       
   133     def testSMSJob(self):
       
   134         job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
       
   135         self.assertEqual(job.getStatus(),("init",{}))
       
   136         self.queue.put(job)
       
   137         sleep(.1)
       
   138         self.stop()
       
   139         self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
       
   140         self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
       
   141                 [(20,'Workerprocess läuft nun...'),
       
   142                  (20,'ein neuer Job(1)'),
       
   143                  (20,'Job(1) fertig ;)')])
       
   144 
       
   145 
       
   146 if __name__ == "__main__":
       
   147     unittest.main()