iro/test_helpers/smtp_helper.py
branchdevel
changeset 230 448dd8d36839
parent 164 b634a8fed51f
child 294 0e75bd39767d
equal deleted inserted replaced
229:1bd4c7f58b3f 230:448dd8d36839
       
     1 """
       
     2 A helper class which allows test code to intercept and store sent email.
       
     3 (from: http://lakin.weckers.net/thoughts/twisted/part1/threaded/)
       
     4 """
       
     5 import smtpd
       
     6 import asyncore
       
     7 
       
     8 class TestSMTPServer(smtpd.SMTPServer):
       
     9     """
       
    10     An SMTP Server used to allow integration tests which ensure email is sent.
       
    11     """
       
    12 
       
    13     def __init__(self, localaddr):
       
    14         self.rcvd = []
       
    15         smtpd.SMTPServer.__init__(self, localaddr, None)
       
    16 
       
    17     def start(self):
       
    18         import threading
       
    19         self.poller = threading.Thread(target=asyncore.loop,
       
    20                 kwargs={'timeout':0.1, 'use_poll':True})
       
    21         self.poller.start()
       
    22 
       
    23     def process_message(self, peer, mailfrom, rcpttos, data):
       
    24         self.rcvd.append((mailfrom, rcpttos, data))
       
    25 
       
    26     def close(self):
       
    27         smtpd.SMTPServer.close(self)
       
    28         self.poller.join()