# HG changeset patch # User Sandro Knauß # Date 1332075987 -3600 # Node ID 3929338fd17fdcbe11fa184bfbfb5d0705a32ec9 # Parent 448dd8d36839505f65fe40ce85da2ca9138be1fa moving tests -> iro.tests diff -r 448dd8d36839 -r 3929338fd17f iro/tests/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/__init__.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,11 @@ +import db +import config +import email_validate, telnumber, validate, model_validate +import offer, job, task +import smtp, smstrade, offer_integrated + +import reload, xmlrpc +import install +import viewinterface + +__all__=[db] diff -r 448dd8d36839 -r 3929338fd17f iro/tests/config.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/config.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,187 @@ +from mock import patch, Mock +from twisted.trial import unittest +import signal +import io +import ConfigParser +from iro import config, error +from iro.offer.provider import Provider, providers + +class TestModuleConfig(unittest.TestCase): + '''test config class''' + + def setUp(self): + self._reloadList=config.configParser.reloadList + config.configParser.reloadList=[] + + def tearDown(self): + config.configParser.reloadList = self._reloadList + + @patch('iro.config.main') + @patch('iro.config.configParser') + def testReadConfig(self,pConfig,pMain): + pMain._init = True + config.readConfig() + self.assertEqual([i[0] for i in pConfig.method_calls],["read","reload_","items"]) + pConfig.read.assert_called_once_with(config.confFiles) + pConfig.items.assert_called_once_with("main") + pConfig.reload_.assert_called_once_with() + self.assertEqual(pMain.load.called,1) + + @patch('iro.config.main') + @patch('iro.config.configParser') + def testReadConfigInit(self,pConfig,pMain): + pConfig.items.return_value = [('dburl',''),("port",1000)] + pMain._init = False + config.readConfig() + self.assertEqual([i[0] for i in pConfig.method_calls],["read","reload_","items"]) + self.assertEqual([i[0] for i in pMain.method_calls],["same"]) + sa = pMain.same.call_args_list[0][0][0] + self.assertEqual(sa.port,1000) + self.assertEqual(sa.dburl,'') + pMain.same.return_value = False + self.assertRaises(Exception,config.readConfig,) + + + + @patch('signal.signal') + @patch('iro.config.readConfig') + def testRegisterSignal(self, pReadConfig, pSignal): + config.registerSignal() + self.assertEqual(pSignal.call_args[0][0],signal.SIGUSR2) + self.assertEqual(pReadConfig.called,0) + pSignal.call_args[0][1](None, None) + pReadConfig.assert_called_once_with() + + def testRegisterReload(self): + def x(): + pass + config.configParser.registerReload(x) + self.assertEqual(config.configParser.reloadList,[x]) + + def testReload(self): + x = Mock() + config.configParser.reloadList = [x] + config.configParser.reload_() + x.assert_called_once_with() + + +class TestRead(unittest.TestCase): + + def tearDown(self): + for s in config.configParser.sections(): + config.configParser.remove_section(s) + + @patch('iro.config.ConfigParser.read') + def testMain(self,pRead): + sample_config = """[main] +port = 8000 +dburl = sdfdsafgsfdg +""" + config.configParser.readfp(io.BytesIO(sample_config)) + config.configParser.read([]) + pRead.assert_called_once_with(config.configParser,[]) + + @patch('iro.config.ConfigParser.read') + def testMainBadPort(self,pRead): + sample_config = """[main] +port = -8000 +dburl = sadfaserasg +""" + config.configParser.readfp(io.BytesIO(sample_config)) + self.assertRaises(error.ValidateException, config.configParser.read, []) + + @patch('iro.config.ConfigParser.read') + def testMainNoMust(self,pRead): + sample_config = """[main] +port = 8000 +""" + config.configParser.readfp(io.BytesIO(sample_config)) + self.assertRaises(config.NeededOption, config.configParser.read, []) + + @patch('iro.config.ConfigParser.read') + def testMust(self,pRead): + v=Mock() + config.main.options["test"] = config.Option(v,default="jehei") + try: + sample_config = """[main] +dburl = sdfawersdf +port = 8000 +""" + config.configParser.readfp(io.BytesIO(sample_config)) + config.configParser.read([]) + self.assertEqual(v.called,0) + config.main.load(config.configParser.items("main")) + self.assertEqual(config.main.test,'jehei') + sample_config = """[main] +dburl = adfgsdftsfg +port = 8000 +test = foohu + """ + config.configParser.readfp(io.BytesIO(sample_config)) + config.configParser.read([]) + v.assert_called_once_with("foohu","test") + finally: + del(config.main.options["test"]) + + @patch('iro.config.ConfigParser.read') + def testProviders(self, pRead): + v=Mock() + class TestProvider(Provider): + def __init__(self,name): + Provider.__init__(self,name) + self.options.update({"test":config.Option(v)}) + providers["test"]=TestProvider + try: + sample_config = """[p] +""" + config.configParser.readfp(io.BytesIO(sample_config)) + self.assertRaises(ConfigParser.NoOptionError, config.configParser.read, []) + self.assertEqual(v.called,0) + sample_config = """[p] +typ= test +test= foo +""" + config.configParser.readfp(io.BytesIO(sample_config)) + config.configParser.read([]) + v.assert_called_once_with("foo","test") + finally: + del(providers["test"]) + + +class TestConfig(unittest.TestCase): + def testSame(self): + c1 = config.Config("1") + c1.port = 1 + c1.dburl = "dburl1" + c2 = config.Config("2") + c2.port = 1 + c2.dburl = "dburl1" + self.assertTrue(c1.same(c2)) + self.assertTrue(c1.same(c1)) + self.assertTrue(c2.same(c1)) + c2.port = 2 + self.assertFalse(c2.same(c1)) + self.assertFalse(c1.same(c2)) + c2.port = 1 + c2.dburl = "dburl2" + self.assertFalse(c2.same(c1)) + self.assertFalse(c1.same(c2)) + + def testSampleConf(self): + c1 = config.Config("1") + self.assertEqual(c1.sampleConf(),["[1]", + "# Connection URL to database", + "dburl = ","", + "# Port under that twisted is running", + "port = ",""]) + + def testsampleConfDefault(self): + c1 = config.Config("1") + c1.options["port"].default = 12345 + c1.options["port"].must = False + c1.options["dburl"].default = True + self.assertEqual(c1.sampleConf(),["[1]", + "# Connection URL to database", + "dburl = True","", + "# Port under that twisted is running", + "# port = 12345",""]) diff -r 448dd8d36839 -r 3929338fd17f iro/tests/db.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/db.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,286 @@ +import unittest +from sqlalchemy.orm.exc import MultipleResultsFound +from datetime import datetime + +from iro.model.schema import User, Offer, Userright, Job, Message +from decimal import Decimal + +from ..test_helpers.dbtestcase import DBTestCase + +class DBTests(DBTestCase): + """tests for the db model""" + + def testRoutes(self): + '''test routes''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.routes('sms').all(),[('sipgate_basic',),]) + + def testRoutesNoDefault(self): + '''test default routes''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.routes('sms',default=True).all(),[]) + + def testRoutesDefault(self): + '''test default routes''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o,default=1)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.routes('sms',default=True).all(),[('sipgate_basic',),]) + + def testRoutesDefaultOrdering(self): + '''test default routes ordering''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="s1", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o,default=3)) + o=Offer(name="s2", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o,default=None)) + o=Offer(name="s3", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o,default=1)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.routes('sms',default=True).all(),[('s3',),('s1',)]) + + def testProviders(self): + '''test providers''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.providers('sms').all(),[('sipgate',),]) + + def testTyps(self): + with self.session() as session: + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + session.add(o) + + with self.session() as session: + self.assertEqual(Offer.typs(session).all(),[('sms',),]) + + with self.session() as session: + o=Offer(name="s2", provider="sipgate", route="basic", typ="sms") + session.add(o) + o=Offer(name="s3", provider="sipgate", route="basic", typ="sms2") + session.add(o) + + with self.session() as session: + self.assertEqual(Offer.typs(session).order_by(Offer.typ).all(),[('sms',),('sms2',)]) + + def testOfferRoutes(self): + with self.session() as session: + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + session.add(o) + + with self.session() as session: + self.assertEqual([o.name for o in Offer.routes(session, "sms").order_by(Offer.name)],["sipgate_basic"]) + + with self.session() as session: + o=Offer(name="s2", provider="sipgate", route="basic", typ="sms") + session.add(o) + o=Offer(name="s3", provider="sipgate", route="basic", typ="sms2") + session.add(o) + + with self.session() as session: + self.assertEqual([o.name for o in Offer.routes(session, "sms2").order_by(Offer.name)],["s3"]) + self.assertEqual([o.name for o in Offer.routes(session, "sms").order_by(Offer.name)],["s2","sipgate_basic"]) + + def testOfferProviders(self): + with self.session() as session: + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + session.add(o) + + with self.session() as session: + self.assertEqual([o.provider for o in Offer.providers(session, "sms").order_by(Offer.provider)],["sipgate"]) + + with self.session() as session: + o=Offer(name="s2", provider="sipgate2", route="basic", typ="sms") + session.add(o) + o=Offer(name="s3", provider="sipgate", route="basic", typ="sms2") + session.add(o) + + with self.session() as session: + self.assertEqual([o.provider for o in Offer.providers(session, "sms2").order_by(Offer.provider)],["sipgate"]) + self.assertEqual([o.provider for o in Offer.providers(session, "sms").order_by(Offer.provider)],["sipgate","sipgate2"]) + +class Has_RightTests(DBTestCase): + '''test User.has_right''' + def testSimple(self): + '''test a very simple case''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.has_right("sms"),"sipgate_basic") + self.assertEqual(u.has_right("sms", offer_name="sipgate_basic"),"sipgate_basic") + self.assertEqual(u.has_right("sms", route="basic"),"sipgate_basic") + self.assertEqual(u.has_right("sms", provider="sipgate"),"sipgate_basic") + + def testDouble(self): + '''now we have different routes''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="b", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + o=Offer(name="c", provider="sipgate", route="c", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.has_right("sms", offer_name="b"),"b") + self.assertEqual(u.has_right("sms", route="basic"),"b") + self.assertEqual(u.has_right("sms", offer_name="c"),"c") + self.assertEqual(u.has_right("sms", route="c"),"c") + self.assertRaises(MultipleResultsFound, u.has_right,"sms") + self.assertRaises(MultipleResultsFound, u.has_right,"sms", provider="sipgate") + + def testUnknown(self): + ''' a unknown typ''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="b", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + o=Offer(name="c", provider="sipgate", route="c", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.has_right("fax", offer_name="b"), None) + self.assertEqual(u.has_right("fax"), None) + + + +class BillTest(DBTestCase): + """test the bill function""" + + def testBill(self): + '''test bill function''' + apikey='abcdef123456789' + with self.session() as session: + u=User(name='test',apikey=apikey) + session.add(u) + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + j = Job(info='i',status='sended') + m = Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.30, offer=o, job=j) + u.rights.append(Userright(o)) + u.jobs.append(j) + session.add(m) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.rights[0].bill.all(),[(1L,Decimal('0.3000'),'i')]) + + def testBillAgreget(self): + apikey='abcdef123456789' + with self.session() as session: + u=User(name='test',apikey=apikey) + session.add(u) + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + j = Job(info='i',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.30, offer=o)) + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) + u.rights.append(Userright(o)) + u.jobs.append(j) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.rights[0].bill.all(),[(2L,Decimal('0.7000'),'i')]) + + def testBillIsBilled(self): + apikey='abcdef123456789' + with self.session() as session: + u=User(name='test',apikey=apikey) + session.add(u) + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + j = Job(info='i',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=True, date=datetime.now() , price=0.30, offer=o)) + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) + u.rights.append(Userright(o)) + u.jobs.append(j) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.rights[0].bill.all(),[(1L,Decimal('0.4000'),'i')]) + + def testBillMultipleJobs(self): + apikey='abcdef123456789' + with self.session() as session: + u=User(name='test',apikey=apikey) + session.add(u) + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + u.rights.append(Userright(o)) + + j = Job(info='i',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) + u.jobs.append(j) + + j = Job(info='a',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) + u.jobs.append(j) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.rights[0].bill.order_by(Job.info).all(), + [(1L,Decimal('0.4000'),'a'), + (1L,Decimal('0.4000'),'i') + ]) + + def testBillMultipleOffers(self): + apikey='abcdef123456789' + with self.session() as session: + u=User(name='test',apikey=apikey) + session.add(u) + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + u.rights.append(Userright(o)) + + j = Job(info='a',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) + u.jobs.append(j) + + o = Offer(name='sipgate_gold',provider="sipgate",route="gold",typ="sms") + u.rights.append(Userright(offer=o)) + + j = Job(info='a',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.5, offer=o)) + u.jobs.append(j) + + with self.session() as session: + u=session.merge(u) + self.assertEqual(u.rights[0].bill.all(), + [(1L,Decimal('0.4000'),'a')]) + + self.assertEqual(u.rights[1].bill.all(),[(1L,Decimal('0.5000'),'a')]) + + +if __name__ == '__main__': + unittest.main() diff -r 448dd8d36839 -r 3929338fd17f iro/tests/email_validate.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/email_validate.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,65 @@ +import unittest + +from iro.validate import vEmail +from iro.error import InvalidMail, ValidateException + + +class testEmail(unittest.TestCase): + """tests for email adresses""" + + def testVaildEmail(self): + '''test vaild email adresses (got from wikipedia)''' + validmails=["niceandsimple@example.com", + "simplewith+symbol@example.com", + 'a.little.unusual@example.com', + 'a.little.more.unusual@dept.example.com', + '@[10.10.10.10]', + '@[1.1.1.1]', + '@[200.100.100.100]', + 'user@[2001:db8:1ff::a0b:dbd0]', + '"much.more\ unusual"@example.com', + 't."test".t@example.com', + '"very.unusual.@.unusual.com"@example.com', + '"very.(),:;<>[]\\".VERY.\\"very@\\\ \\"very\\".unusual"@strange.example.com', + "!#$%&'*+-/=?^_`{}|~@example.org", + '"()<>[]:;@,\\\"!#$%&\'*+-/=?^_`{}| ~ ? ^_`{}|~."@example.org', + '""@example.org'] + + for num in validmails: + self.failUnlessEqual(vEmail([num],None),[num]) + + def testInvaildEmail(self): + '''test invaild email adresses (got from wikipedia)''' + invalid=["Abc.example.com", # (an @ character must separate the local and domain parts) + "Abc.@example.com", # (character dot(.) is last in local part) + "Abc..123@example.com", # (character dot(.) is double) + "A@b@c@example.com", # (only one @ is allowed outside quotation marks) + 'a"b(c)d,e:f;gi[j\k]l@example.com', # (none of the special characters in this local part is allowed outside quotation marks) + 'just"not"right@example.com', # (quoted strings must be dot separated, or the only element making up the local-part) + 'thisis."notallowed@example.com', # (spaces, quotes, and backslashes may only exist when within quoted strings and preceded by a slash) + 'this\\ still\\"not\\allowed@example.com', # (even if escaped (preceded by a backslash), spaces, quotes, and backslashes must still be contained by quotes) + 't."test".t"test".t@example.com', + 't."test"t."test".t@example.com', + ] + + for number in invalid: + with self.assertRaises(InvalidMail) as e: + vEmail([number],None) + self.failUnlessEqual(e.exception.number, number) + + def testInvalidDomain(self): + '''invalid Domainname''' + with self.assertRaises(InvalidMail): + vEmail(['x@&.de'],None) + + def testDoubles(self): + self.assertEqual(vEmail(['x@test.de','x@test.de'],None),["x@test.de"]) + + def testString(self): + self.assertEqual(vEmail('x@test.de', None),"x@test.de") + + def testAllowString(self): + self.assertRaises(ValidateException,vEmail, 'x@test.de', None, allowString=False) + + def testAllowList(self): + self.assertRaises(ValidateException,vEmail, ['x@test.de'], None, allowList=False) diff -r 448dd8d36839 -r 3929338fd17f iro/tests/install.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/install.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,110 @@ +#from mock import patch, Mock +from twisted.trial import unittest +from twisted.python import log +from sqlalchemy import create_engine +import os + +from iro import install +from iro import config +from iro.model.schema import Base + +from ..test_helpers.dbtestcase import md, SampleDatabase + +class DummyObserver(object): + def __init__(self): + self.e=[] + + def start(self): + log.addObserver(self.emit) + + def stop(self): + log.removeObserver(self.emit) + + def emit(self, eventDict): + self.e.append(eventDict) + + +class TestInstallation(unittest.TestCase): + '''test install script''' + + def setUp(self): + md.setUp() + if not hasattr(md,"db2"): + md.db2=SampleDatabase("test2","test2",'%s/my.cnf'%md.tdir) + md.dburl2='mysql://test2:test@localhost/test2?unix_socket=%s/socket'%md.tdir + md.db2.create() + self.log = DummyObserver() + self.log.start() + self.engine = None + + def tearDown(self): + self.log.stop() + if self.engine: + Base.metadata.drop_all(self.engine) + self.engine = None + try: + os.remove("iro.conf") + except OSError as e: + if e.errno != 2: + raise + + + + def testCheckConfig(self): + self.assertEqual(install.checkConfig(),False) + with open("iro.conf",'w') as fp: + fp.write("""[main] +dburl=foo""") + self.assertEqual(install.checkConfig(),False) + self.assertEqual(len(self.log.e),1) + self.assertEqual(self.log.e[0]['message'], ("Error while processing config file: Option 'port' in section 'main' is missing.",)) + with open("iro.conf",'w') as fp: + fp.write("""[main] +dburl=foo +port=123456 +""") + self.assertEqual(install.checkConfig(),True) + + def testCheckDatabase(self): + config.main.dburl=md.dburl2 + self.assertTrue(install.checkDatabaseConnection()) + self.assertFalse(install.checkDatabase()) + self.engine = create_engine(md.dburl2) + Base.metadata.create_all(self.engine) + self.assertTrue(install.checkDatabase()) + + def testCheckDatabaseConnection(self): + config.main.dburl="mysql://t@localhost/test" + self.assertFalse(install.checkDatabaseConnection()) + self.assertEqual(len(self.log.e),1) + self.assertTrue(self.log.e[0]['message'][0].startswith("Error while trying to connect to database\n")) + config.main.dburl="sqlite://" + self.assertTrue(install.checkDatabaseConnection()) + + + def testCreateDatabase(self): + config.main.dburl=md.dburl2 + self.assertTrue(install.checkDatabaseConnection()) + self.assertFalse(install.checkDatabase()) + install.createDatabase() + self.assertTrue(install.checkDatabase()) + + def testCreateSampleConfig(self): + install.createSampleConfig() + with open("iro.conf",'r') as fp: + c = fp.read() + with open(os.path.abspath("../iro.conf.inst"),"r") as fp: + self.assertEqual(c,fp.read()) + + def testNoOverwrite(self): + with open("iro.conf","w") as fp: + fp.write("muhaha") + install.createSampleConfig() + with open("iro.conf",'r') as fp: + self.assertEqual(fp.read(),"muhaha") + self.assertEqual(len(self.log.e),1) + self.assertEqual(self.log.e[0]['message'], ("iro.conf exists and will not be overwritten.",)) + + def testCheck(self): + pass + testCheck.todo = "to implement" diff -r 448dd8d36839 -r 3929338fd17f iro/tests/job.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/job.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,194 @@ +from datetime import datetime +from decimal import Decimal +from mock import patch + +from iro.model.job import exJobs, ExJob +from iro.model.pool import data +from iro.model.message import SMS +from iro.model.status import Status +from iro.model.schema import Job, User, Offer as DBOffer, Userright + +from iro.controller.task import Task + +from iro.offer.provider import Provider + +from iro.telnumber import Telnumber +from iro.validate import vInteger + +from ..test_helpers.dbtestcase import DBTestCase + +class DummyPool(): + def run(self, f,*a,**k): + return f(*a,**k) + +from twisted.python import log + +class DummyObserver(object): + def __init__(self): + self.e=[] + + def start(self): + log.addObserver(self.emit) + + def stop(self): + log.removeObserver(self.emit) + + def emit(self, eventDict): + self.e.append(eventDict) + +class JobTestCase(DBTestCase): + def setUp(self): + DBTestCase.setUp(self) + self.pool = data.pool + data.pool = DummyPool() + + def tearDown(self): + exJobs.clear() + data.pool = self.pool + self.pool = None + DBTestCase.tearDown(self) + + +class exJobsTest(JobTestCase): + '''tests for exJobs''' + + def testCreate(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + + job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) + self.assertIsInstance(job, ExJob) + self.assertTrue(vInteger(job.dbjob, None, minv=0 )) + self.assertEqual(job.message, SMS('test')) + self.assertEqual(job.recipients, [Telnumber('123456789')]) + self.assertEqual(job.offers,[]) + self.assertEqual(job.tasks,{}) + + with self.session() as session: + j = session.query(Job.id).all() + self.assertEqual(j,[(job.dbjob,)]) + + self.assertEqual(exJobs[job.dbjob],job) + + def testCreate2(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="test", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o)) + + job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) + self.assertEqual(job.offers,['test']) + + def testGet(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + + job = ExJob(None, [Telnumber('123456789')], SMS('test'), ['test']) + exJobs[1] = job + + self.assertEqual(len(exJobs), 1) + self.assertEqual(job, exJobs[1]) + + def testGetFromDB(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + job = Job( info="info", status="started") + u.jobs.append(job) + session.add(u) + + with self.session() as session: + job = session.merge(job) + u = session.merge(u) + ejob= ExJob(job.id, [Telnumber('123456789')], SMS('test'), ['test']) + exJobs[job.id]=ejob + self.assertEqual(job.extend, ejob) + self.assertEqual(u.jobs[0].extend,ejob) + + def testUnknownExJob(self): + self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890') + + +class StatiTest(JobTestCase): + def setUp(self): + JobTestCase.setUp(self) + self.log = DummyObserver() + self.log.start() + + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="test", provider="bla", route="a", typ="sms") + u.rights.append(Userright(o)) + + self.user = u + self.offer = o + + self.provider=Provider("bla", {"sms":["a","b","c"]}) + self.job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), []) + + def tearDown(self): + self.log.stop() + JobTestCase.tearDown(self) + + + def testSetError(self): + self.job.setError(Task(Telnumber('123456789'),self),Exception("muhaha")) + errors = self.flushLoggedErrors(Exception) + self.assertEqual(len(errors), 1) + self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to '0049123456789' failed."%self.job.dbjob) + + with self.session() as session: + u = session.merge(self.user) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"error") + + def testSetStatus(self): + task = Task(Telnumber('123456789'),self.job) + status = Status(self.provider,"a") + self.job.setStatus(task, status) + + self.assertEqual(self.log.e[0]['message'], ("Job(%s) to '0049123456789' ended sucecessfully via bla:a."%self.job.dbjob,)) + + with self.session() as session: + u = session.merge(self.user) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"sended") + self.assertEqual(len(job.messages),0) + + def testMultipleRecipients(self): + self.job.recipients.append(Telnumber("01234567890")) + task = Task(Telnumber('123456789'),self.job) + status = Status(self.provider,"a") + self.job.setStatus(task, status) + + with self.session() as session: + u = session.merge(self.user) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"sending") + + @patch("iro.model.job.datetime") + def testCosts(self,p_dt): + p_dt.today.return_value = datetime(2000, 1, 2, 3, 4, 5) + task = Task(Telnumber('123456789'),self.job) + status = Status(self.provider,"a",costs=0.055,exID="12345678",count=1) + + self.job.setStatus(task, status) + + with self.session() as session: + u = session.merge(self.user) + o = session.merge(self.offer) + job = u.job(self.job.dbjob) + self.assertEqual(job.status,"sended") + self.assertEqual(len(job.messages),1) + + msg = job.messages[0] + self.assertEqual(msg.price,Decimal('0.0550')) + self.assertEqual(msg.isBilled,False) + self.assertEqual(msg.recipient,str(Telnumber('123456789'))) + self.assertEqual(msg.date,datetime(2000, 1, 2, 3, 4, 5)) + self.assertEqual(msg.offer,o) + self.assertEqual(msg.exID,"12345678") + self.assertEqual(msg.count,1) diff -r 448dd8d36839 -r 3929338fd17f iro/tests/model_validate.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/model_validate.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,57 @@ +from iro.model.schema import Offer +from iro.model.decorators import vRoute, vTyp +from iro.model.pool import data + +from iro.error import ValidateException + +from ..test_helpers.dbtestcase import DBTestCase + +class DummyPool(): + def run(self, f,*a,**k): + return f(*a,**k) + +class ModelVaidatorTest(DBTestCase): + """tests for the model vaidators""" + def setUp(self): + DBTestCase.setUp(self) + self.pool = data.pool + data.pool = DummyPool() + + def tearDown(self): + data.pool = self.pool + self.pool = None + DBTestCase.tearDown(self) + + def testTyp(self): + with self.session() as session: + session.add(Offer(name="t",provider="p",typ="type")) + + with self.session() as session: + self.assertEqual(vTyp("type",None),"type") + e = self.assertRaises(ValidateException,vTyp, "sss", None) + self.assertEqual(str(e),'700: Typ sss is not valid.') + + def testRoute(self): + with self.session() as session: + session.add(Offer(name="t",provider="p",typ="type")) + self.assertEqual(vRoute("t",None,typ="type"),"t") + self.assertEqual(vRoute(["t","t"],None,typ="type"),["t"]) + e = self.assertRaises(ValidateException,vRoute, "s", None, typ="type") + self.assertEqual(str(e),'700: Route s is not valid.') + + def testRouteAllow(self): + with self.session() as session: + session.add(Offer(name="t",provider="p",typ="type")) + e = self.assertRaises(ValidateException,vRoute, "t", "foo", typ="type", allowString=False) + self.assertEqual(str(e),'700: foo must be a list of routes.') + e = self.assertRaises(ValidateException,vRoute, ["t"], "foo", typ="type", allowList=False) + self.assertEqual(str(e),'700: foo must be a route - No list of routes.') + + def testRouteProvider(self): + with self.session() as session: + session.add(Offer(name="t",provider="p",typ="type")) + self.assertEqual(vRoute("p",None,typ="type"),"p") + + def testRouteDefault(self): + self.assertEqual(vRoute("default",None,typ="type"),"default") + diff -r 448dd8d36839 -r 3929338fd17f iro/tests/offer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/offer.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,156 @@ +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks + +import io + +from iro.model import offer +from iro.config import configParser +from iro.offer import Offer, providers as OfferProviders, Provider +from iro.model.schema import User, Offer as DBOffer, Userright +from iro.controller.pool import dbPool +from iro.error import NoProvider +from ..test_helpers.dbtestcase import DBTestCase + +class TestOffers(DBTestCase): + '''test config class''' + def setUp(self): + DBTestCase.setUp(self) + dbPool.start(reactor) + + def tearDown(self): + for s in configParser.sections(): + configParser.remove_section(s) + + dbPool.pool.stop() + offer.offers.clear() + + try: + del(OfferProviders["test"]) + except KeyError: + pass + + offer.providers.clear() + DBTestCase.tearDown(self) + + def testReloadList(self): + '''test if loadOffers will be fired by reloading Config''' + self.assertTrue(offer.loadOffers in configParser.reloadList) + + @inlineCallbacks + def testExtendProviderUnknown(self): + '''test the extendProvider Function (route unknown)''' + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + + ret = yield offer.extendProvider(u, "sms", ["blub"]) + self.assertEqual(ret, []) + + @inlineCallbacks + def testExtendProviderKnown(self): + '''test the extendProvider Function (route known)''' + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="blub", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o)) + + ret = yield offer.extendProvider(u, "sms", ["blub"]) + self.assertEqual(ret, ["blub"]) + + ret = yield offer.extendProvider(u, "sms", ["blub", "blub"]) + self.assertEqual(ret, ["blub"]) + + @inlineCallbacks + def testExtendProviderProvider(self): + '''test the extendProvider Function (provider known)''' + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="oh", provider="bla", route="a", typ="sms") + u.rights.append(Userright(o)) + + offer.providers={"bla":Provider("bla", {"sms":["a","b","c"]})} + + for l in [['bla'],['oh'],['oh','bla'],['bla','oh']]: + ret = yield offer.extendProvider(u, "sms", l) + self.assertEqual(ret, ["oh"]) + + @inlineCallbacks + def testExtendProviderDouble(self): + '''test the extendProvider Function (provider and name doubles)''' + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="oh", provider="bla", route="a", typ="sms") + u.rights.append(Userright(o)) + o=DBOffer(name="a", provider="bla", route="b", typ="sms") + u.rights.append(Userright(o)) + + offer.providers={"bla":Provider("bla", {"sms":["a","b","c"]})} + + ret = yield offer.extendProvider(u, "sms", ["bla"]) + self.assertEqual(ret, ["oh","a"]) + + ret = yield offer.extendProvider(u, "sms", ["a","bla"]) + self.assertEqual(ret, ["a","oh"]) + + ret = yield offer.extendProvider(u, "sms", ["bla", "a"]) + self.assertEqual(ret, ["oh","a"]) + + @inlineCallbacks + def testExtendProviderDefault(self): + '''test the extendProvider Function (default)''' + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="blub3", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o,default=2)) + o=DBOffer(name="blub", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o,default=1)) + o=DBOffer(name="blub2", provider="bla2", route="basic", typ="sms") + u.rights.append(Userright(o)) + o=DBOffer(name="blub4", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o,default=3)) + + ret = yield offer.extendProvider(u, "sms", ["default"]) + self.assertEqual(ret, ["blub","blub3",'blub4']) + + ret = yield offer.extendProvider(u, "sms", "default") + self.assertEqual(ret, ["blub","blub3", 'blub4']) + + + @inlineCallbacks + def testLoadOffers(self): + + class TestProvider(Provider): + def __init__(self,name): + Provider.__init__(self,name,{"sms":["a",]}) + + with self.session() as session: + session.add(DBOffer(name="oh", provider="p", route="a", typ="sms")) + + sample_config = """[p] +typ = test +""" + configParser.readfp(io.BytesIO(sample_config)) + OfferProviders["test"]=TestProvider + yield offer.loadOffers() + self.assertEqual(offer.offers.keys(),["oh"]) + self.assertEqual(offer.providers.keys(),["p"]) + self.assertEqual(offer.providers["p"].name,"p") + self.assertEqual(offer.providers["p"].typ,"test") + self.assertEqual(offer.offers["oh"],Offer(provider=offer.providers["p"], name="oh", route="a", typ="sms")) + + def testLoadOffersUnknown(self): + with self.session() as session: + session.add(DBOffer(name="oh", provider="p", route="a", typ="sms")) + + sample_config = """[p] +typ = unknown +""" + configParser.readfp(io.BytesIO(sample_config)) + d = offer.loadOffers() + self.assertFailure(d, NoProvider) + return d + diff -r 448dd8d36839 -r 3929338fd17f iro/tests/offer_integrated.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/offer_integrated.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,215 @@ +from twisted.internet.defer import inlineCallbacks + +from datetime import datetime +from decimal import Decimal +from mock import patch, Mock + +from iro.model.job import exJobs +from iro.model.pool import data +from iro.model.schema import User, Offer as DBOffer, Userright +from iro.model.message import SMS, Mail +from iro.model import offer + +from iro.controller.task import Task, taskPool +from iro.telnumber import Telnumber + +from iro.offer import Smstrade, SMTP, Offer +from iro.offer.smstrade import SmstradeException, StatusCode + +from ..test_helpers.dbtestcase import DBTestCase + +class DummyPool(): + def run(self, f,*a,**k): + return f(*a,**k) + +def run( f,*a,**k): + return f(*a,**k) + +from twisted.python import log + +class DummyObserver(object): + def __init__(self): + self.e=[] + + def start(self): + log.addObserver(self.emit) + + def stop(self): + log.removeObserver(self.emit) + + def emit(self, eventDict): + self.e.append(eventDict) + + + +class IntegratedOfferTests(DBTestCase): + def setUp(self): + DBTestCase.setUp(self) + self.pool = data.pool + data.pool = DummyPool() + + self.taskPool = taskPool.run + taskPool.run = run + + self.log = DummyObserver() + self.log.start() + + def tearDown(self): + self.log.stop() + exJobs.clear() + data.pool = self.pool + self.pool = None + taskPool.run = self.taskPool + self.taskPool = None + DBTestCase.tearDown(self) + + + @patch("iro.model.job.datetime") + @patch("urllib.urlopen") + @inlineCallbacks + def testSmstrade(self, p_u, p_dt): + f = Mock() + f.readlines.return_value = ["100","12345678","0.055","1"] + p_u.return_value = f + + p_dt.today.return_value = datetime(2000, 1, 2, 3, 4, 5) + + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="s", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o)) + + + offer.providers["bla"] = Smstrade("bla") + offer.providers["bla"].key = "XXXXXX" + offer.offers["s"] = Offer("s",offer.providers["bla"],"basic","sms") + + j = yield exJobs.create(u,[Telnumber("0123456789")],SMS("bla"),['s'],'tesched') + t = Task(Telnumber("0123456789"),j) + yield t.start() + + self.assertEqual(self.log.e[0]['message'], ("Job(%s) to '0049123456789' ended sucecessfully via bla:basic."%j.dbjob,)) + + with self.session() as session: + u = session.merge(u) + o = session.merge(o) + job = u.job(j.dbjob) + self.assertEqual(job.status,"sended") + self.assertEqual(job.info,"tesched") + self.assertEqual(len(job.messages),1) + + msg = job.messages[0] + self.assertEqual(msg.price,Decimal('0.0550')) + self.assertEqual(msg.isBilled,False) + self.assertEqual(msg.recipient,str(Telnumber('123456789'))) + self.assertEqual(msg.date,datetime(2000, 1, 2, 3, 4, 5)) + self.assertEqual(msg.offer,o) + self.assertEqual(msg.exID,"12345678") + self.assertEqual(msg.count,1) + + + @patch("urllib.urlopen") + @inlineCallbacks + def testSmstradeException(self, mock_urlopen): + f = Mock() + f.readlines.return_value = ["703"] + mock_urlopen.return_value = f + + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="s", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o)) + + offer.providers["bla"] = Smstrade("bla") + offer.providers["bla"].key = "XXXXXX" + offer.offers["s"] = Offer("s",offer.providers["bla"],"basic","sms") + + j = yield exJobs.create(u,[Telnumber("0123456789")],SMS("bla"),['s'],'tesched') + t = Task(Telnumber("0123456789"),j) + yield t.start() + + errors = self.flushLoggedErrors(SmstradeException) + self.assertEqual(len(errors), 1) + self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to '0049123456789' failed."%j.dbjob) + + self.assertEqual(t.error, True) + self.assertEqual(str(t.status.value),str(SmstradeException(StatusCode(703)))) + + with self.session() as session: + u = session.merge(u) + o = session.merge(o) + job = u.job(j.dbjob) + self.assertEqual(job.status,"error") + self.assertEqual(len(job.messages),0) + + @patch("smtplib.SMTP") + @inlineCallbacks + def testSmtp(self, p_s ): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="s", provider="bla", route=None, typ="mail") + u.rights.append(Userright(o)) + + offer.providers["bla"] = SMTP("bla") + offer.providers["bla"].SSL = False + offer.providers["bla"].TLS = False + offer.providers["bla"].host = "localhost" + offer.providers["bla"].port = 12345 + offer.providers["bla"].user = "" + offer.providers["bla"].send_from = "frm@test.de" + offer.offers["s"] = Offer("s",offer.providers["bla"],None,"mail") + + j = yield exJobs.create(u,["t@test.de"],Mail("bla",'msg',None),['s'],'tesched') + t = Task("t@test.de",j) + yield t.start() + + self.assertEqual(self.log.e[0]['message'], ("Job(%s) to 't@test.de' ended sucecessfully via bla:None."%j.dbjob,)) + + with self.session() as session: + u = session.merge(u) + o = session.merge(o) + job = u.job(j.dbjob) + self.assertEqual(job.status,"sended") + self.assertEqual(job.info,"tesched") + self.assertEqual(len(job.messages),0) + + @patch("smtplib.SMTP") + @inlineCallbacks + def testSmtpException(self, p_s): + p_s.side_effect = IOError(111,"Connection refused") + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="s", provider="bla", route=None, typ="mail") + u.rights.append(Userright(o)) + + offer.providers["bla"] = SMTP("bla") + offer.providers["bla"].SSL = False + offer.providers["bla"].TLS = False + offer.providers["bla"].host = "localhost" + offer.providers["bla"].port = 12345 + offer.providers["bla"].user = "" + offer.providers["bla"].send_from = "frm@test.de" + offer.offers["s"] = Offer("s",offer.providers["bla"],None,"mail") + + j = yield exJobs.create(u,["t@test.de"],Mail("bla",'msg',None),['s'],'tesched') + t = Task("t@test.de",j) + yield t.start() + + errors = self.flushLoggedErrors(IOError) + self.assertEqual(len(errors), 1) + self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to 't@test.de' failed."%j.dbjob +) + self.assertEqual(t.error, True) + self.assertEqual(str(t.status.value),str(IOError(111,"Connection refused"))) + + with self.session() as session: + u = session.merge(u) + o = session.merge(o) + job = u.job(j.dbjob) + self.assertEqual(job.status,"error") + self.assertEqual(len(job.messages),0) + diff -r 448dd8d36839 -r 3929338fd17f iro/tests/reload.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/reload.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,14 @@ +from ..test_helpers.dbtestcase import DBTestCase + + +class TestReload(DBTestCase): + '''tests for reloading feature''' + + def testLog(self): + pass + testLog.todo = "reloading logfile is not implemented" + + def testOffers(self): + pass + testOffers.todo = "reloading offers test not implemented" + diff -r 448dd8d36839 -r 3929338fd17f iro/tests/smstrade.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/smstrade.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,130 @@ +from twisted.trial import unittest +from decimal import Decimal +from mock import patch, Mock + +from iro.error import NoRoute, NoTyp, NeededOption, RejectRecipient +from iro.telnumber import Telnumber +from iro.model.message import SMS +from iro.offer.smstrade import Smstrade, SmstradeException, StatusCode + + +HOST = "localhost" +PORT = 9999 + +class TestSMStradeProvider(unittest.TestCase): + + def getProvider(self, c=None): + _c={"key":"XXXXXXXX", + "typ":"smstrade", + } + + if c: + _c.update(c) + + ret = Smstrade("test") + ret.load(_c.items()) + return ret + + @patch("urllib.urlopen") + def testSendSMS(self,mock_urlopen): + f = Mock() + f.readlines.return_value = ["100","12345678","0.055","1"] + mock_urlopen.return_value = f + + params = ["key=XXXXXXXX","to=00491701234567", "message=Hello+World", "route=gold", "message_id=1", "cost=1","count=1",'charset=utf-8'] + params.sort() + + p=self.getProvider() + content = "Hello World" + r = p.send("gold", Telnumber("01701234567"), SMS(content,None)) + + ca = mock_urlopen.call_args[0] + c=ca[1].split("&") + c.sort() + + self.assertEqual(ca[0],"https://gateway.smstrade.de") + self.assertEqual(c,params) + self.assertEqual(f.readlines.call_count,1) + + self.assertEqual(r.provider, p) + self.assertEqual(r.route, 'gold') + self.assertEqual(r.costs, Decimal('0.055')) + self.assertEqual(r.exID, '12345678') + self.assertEqual(r.count, 1) + + def testStatusCode(self): + s = StatusCode(10,"12345678","1.10",1) + self.assertEqual(str(s),'10: Empfaengernummer nicht korrekt.') + self.assertEqual(int(s),10) + self.assertEqual(s.count,1) + self.assertEqual(s.costs,Decimal("1.10")) + self.assertEqual(s.exID,'12345678') + + + def testUnknownStatusCode(self): + s = StatusCode(999) + self.assertEqual(str(s),'999: unknown statuscode.') + self.assertEqual(int(s),999) + self.assertEqual(s.count,0) + self.assertEqual(s.costs,Decimal("0.00")) + self.assertEqual(s.exID, None) + + + + def testRejectRecipient(self): + p=self.getProvider() + content = "Hello World" + e = self.assertRaises(RejectRecipient, p.send, "basic", Telnumber("+331701234567"), SMS(content,None)) + self.assertEqual(str(e),'Reject recipient(00331701234567): None') + + @patch("urllib.urlopen") + def testRejectRecipient70(self,mock_urlopen): + f = Mock() + f.readlines.return_value = ["70"] + mock_urlopen.return_value = f + + p=self.getProvider() + content = "Hello World" + self.assertRaises(RejectRecipient, p.send , "basic", Telnumber("01701234567") ,SMS(content,None)) + + f.readlines.return_value = ["71"] + e = self.assertRaises(RejectRecipient, p.send , "basic", Telnumber("01701234567"), SMS(content,None)) + self.assertEqual(str(e),'Reject recipient(00491701234567): 71: Feature nicht ueber diese Route moeglich.') + + @patch("urllib.urlopen") + def testUnknwonStatuscode(self,mock_urlopen): + f = Mock() + f.readlines.return_value = ["703"] + mock_urlopen.return_value = f + + p=self.getProvider() + content = "Hello World" + e = self.assertRaises(SmstradeException, p.send , "basic", Telnumber("01701234567"), SMS(content,None)) + self.assertEqual(str(e),'950: Error in external API.\n703: unknown statuscode.') + + @patch("urllib.urlopen") + def testSmstradeException(self,mock_urlopen): + f = Mock() + f.readlines.return_value = ["10"] + mock_urlopen.return_value = f + + p=self.getProvider() + content = "Hello World" + e = self.assertRaises(SmstradeException, p.send , "basic", Telnumber("01701234567"), SMS(content,None)) + self.assertEqual(str(e),'950: Error in external API.\n10: Empfaengernummer nicht korrekt.') + + + + def testNeededOption(self): + s= self.getProvider() + self.assertEqual(s.key, "XXXXXXXX") + + self.assertRaises(NeededOption, s.load,[]) + + def testSendFunc(self): + s = self.getProvider() + p = s.getSendFunc("sms","basic") + self.assertEqual(p.func, s.send) + self.assertEqual(p.args, ("basic",)) + self.assertRaises(NoRoute,s.getSendFunc,"sms","foo") + self.assertRaises(NoTyp,s.getSendFunc,"mail2","basic") diff -r 448dd8d36839 -r 3929338fd17f iro/tests/smtp.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/smtp.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,146 @@ +from twisted.trial import unittest + +import email +from email.header import decode_header +import base64 +from ..test_helpers.smtp_helper import TestSMTPServer + +from mock import patch, Mock +import smtplib + +from iro.error import NoRoute, NoTyp, NeededOption +from iro.model.message import Mail +from iro.offer.smtp import SMTP + +HOST = "localhost" +PORT = 9999 + +class TestSMTPProvider(unittest.TestCase): + def setUp(self): + self.smtp_server = TestSMTPServer((HOST, PORT)) + self.smtp_server.start() + + def tearDown(self): + self.smtp_server.close() + + def getSMTP(self, c=None): + _c={"send_from":"send@t.de", + "host":HOST, + "port":PORT, + "typ":"smtp", + } + + if c: + _c.update(c) + + ret = SMTP("test") + ret.load(_c.items()) + return ret + + def testSendMail(self): + p=self.getSMTP() + content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" + p.send("t@t.de", Mail("sub", content, None)) + + + self.assertEqual(len(self.smtp_server.rcvd), 1) + fromaddr, toaddrs, message = self.smtp_server.rcvd[0] + msg = email.message_from_string(message) + + self.assertEqual(fromaddr,"send@t.de") + self.assertEqual(msg.get_all("From"),["send@t.de"]) + self.assertEqual(toaddrs,["t@t.de"]) + self.assertEqual(msg.get_all("To"),["t@t.de"]) + self.assertEqual(decode_header(msg.get("Subject")),[("sub","utf-8")]) + self.assertEqual(base64.b64decode(msg.get_payload()),content) + + def testSendMailExtraFrm(self): + p=self.getSMTP() + content = "" + p.send("t@t.de", Mail("sub", content, "f@t.de")) + + self.assertEqual(len(self.smtp_server.rcvd), 1) + fromaddr, toaddrs, message = self.smtp_server.rcvd[0] + msg = email.message_from_string(message) + + self.assertEqual(fromaddr,"f@t.de") + self.assertEqual(msg.get_all("From"),["f@t.de"]) + + def testSendMailException(self): + p=self.getSMTP({"port":PORT-1}) + content = "" + self.assertRaises(IOError, p.send, "t@t.de", Mail("sub", content, "f@t.de")) + + self.assertEqual(len(self.smtp_server.rcvd), 0) + + @patch("smtplib.SMTP_SSL") + def testSSLSendMail(self,mock_ssl): + def se(*args): + return smtplib.SMTP(*args) + mock_ssl.side_effect=se + + p=self.getSMTP({"SSL":True}) + content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" + p.send("t@t.de", Mail("sub", content, None)) + + self.assertEqual(mock_ssl.call_count,1) + + self.assertEqual(len(self.smtp_server.rcvd), 1) + + @patch("smtplib.SMTP") + def testTLSSendMail(self,mock_smtp): + mock_s = Mock() + mock_smtp.return_value = mock_s + + p=self.getSMTP({"TLS":True}) + content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" + p.send("t@t.de", Mail("sub", content, None)) + + mock_s.starttls.assert_called_once_with() + self.assertEqual(mock_s.sendmail.call_count,1) + self.assertEqual([i[0] for i in mock_s.method_calls],["starttls","sendmail","quit"]) + + @patch("smtplib.SMTP") + def testLoginSendMail(self,mock_smtp): + mock_s = Mock() + mock_smtp.return_value = mock_s + + p=self.getSMTP({"user":"user","password":"pw"}) + content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" + p.send("t@t.de", Mail("sub", content, None)) + + mock_s.login.assert_called_once_with("user","pw") + self.assertEqual(mock_s.sendmail.call_count,1) + self.assertEqual([i[0] for i in mock_s.method_calls],["login","sendmail","quit"]) + + + def testNeededOption(self): + c={"send_from":"send@t.de", + "host":HOST, + "port":PORT, + "user":"u", + "password":"p", + "typ":"smtp", + } + s = self.getSMTP(c) + self.assertEqual(s.send_from, "send@t.de") + self.assertEqual(s.host, HOST) + self.assertEqual(s.port, PORT) + self.assertEqual(s.user, "u") + self.assertEqual(s.password, "p") + self.assertEqual(s.SSL,False) + self.assertEqual(s.TLS,False) + + c.update({"TLS":True, "SSL":True}) + s = self.getSMTP(c) + self.assertEqual(s.SSL,True) + self.assertEqual(s.TLS,True) + + del c["host"] + self.assertRaises(NeededOption, s.load, c) + + def testSendFunc(self): + s = self.getSMTP() + self.assertEqual(s.getSendFunc("mail",None), s.send) + self.assertRaises(NoRoute,s.getSendFunc,"mail","foo") + self.assertRaises(NoTyp,s.getSendFunc,"mail2","foo") diff -r 448dd8d36839 -r 3929338fd17f iro/tests/task.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/task.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,95 @@ +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks + +from Queue import deque + +from iro.model.schema import User, Offer as DBOffer, Userright +from iro.model.message import SMS +from iro.model.status import Status +from iro.model.offer import offers +from iro.model.job import exJobs + +from iro.controller.task import createJob, Task +from iro.controller.pool import taskPool, dbPool + +from iro.offer import Offer, Provider + +from iro.error import NoRouteForTask +from iro.telnumber import Telnumber + +from ..test_helpers.dbtestcase import DBTestCase + +class TaskTestCase(DBTestCase): + def setUp(self): + DBTestCase.setUp(self) + dbPool.start(reactor) + + def tearDown(self): + exJobs.clear() + offers.clear() + dbPool.pool.stop() + taskPool.pool.q.queue = deque() + DBTestCase.tearDown(self) + +class TestTasks(TaskTestCase): + + @inlineCallbacks + def testCreateSMS(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + + job = yield createJob(u,[Telnumber('0123325456')],SMS('sms'),[]) + + self.assertEqual(taskPool.pool.q.qsize(),1) + + self.assertEqual(job.tasks.keys(),[Telnumber('0123325456')]) + self.assertIsInstance(job.tasks[Telnumber('0123325456')], Task) + + @inlineCallbacks + def testRun(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + o=DBOffer(name="test", provider="bla", route="basic", typ="sms") + u.rights.append(Userright(o)) + + p=Provider(name="p", typs={"sms":["test",]}) + def send(typ,route,recipient,message): + return Status(provider=p, route=route) + p.send=send + offers["test"] = Offer("test",provider=p, route="test", typ="sms") + + exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) + + task=Task(Telnumber('123456789'), exjob) + ret = yield task._run() + self.assertIsInstance(ret, Status) + self.assertEqual(ret.provider, p) + self.assertEqual(ret.route, "test") + + @inlineCallbacks + def testNoRoute(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + + exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), []) + + task=Task(Telnumber('123456789'), exjob) + d = task._run() + self.assertFailure(d, NoRouteForTask) + + def testSetStatus(self): + task=Task(Telnumber('123456789'), None) + self.assertEqual(task.status,None) + self.assertEqual(task.error,False) + + self.assertEqual(task.setStatus("fooja"),"fooja") + self.assertEqual(task.status,"fooja") + + def testSetError(self): + task=Task(Telnumber('123456789'), None) + self.assertEqual(task.setError("fooja"),"fooja") + self.assertEqual(task.status,"fooja") + self.assertEqual(task.error,True) diff -r 448dd8d36839 -r 3929338fd17f iro/tests/telnumber.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/telnumber.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,80 @@ +import unittest + +from iro.validate import vTel +from iro.error import InvalidTel +from iro.telnumber import Telnumber + +class testTelefonnumbers(unittest.TestCase): + """tests for telefonnumbers""" + + def testMultipleTelnumbers(self): + '''test the telefon validator''' + ret = vTel(["0123/456(78)","+4912346785433","00123435456-658"], None) + x=[Telnumber('+4912345678'),Telnumber('012346785433'),Telnumber('+123435456658')] + self.assertEqual(ret,x) + + def testInvalidTelnumbers(self): + '''invalid telnumbers''' + + numbers=['xa','+1','1-23',';:+0','0123'] + + for number in numbers: + with self.assertRaises(InvalidTel) as e: + vTel([number], None) + self.assertEqual(e.exception.number,number) + + with self.assertRaises(InvalidTel) as e: + vTel(['01234']+numbers, None) + self.assertEqual(e.exception.number,numbers[0]) + + def testDoubles(self): + ret = vTel(["0123/456(78)","+4912345678","004912345678"], None) + x=[Telnumber('+4912345678')] + self.assertEqual(ret,x) + + def equalNumber(self, tel1, tel2): + self.assertEqual(tel1.number, tel2.number) + self.assertEqual(tel1.land, tel2.land) + + def testWrongNumber(self): + telnum=Telnumber() + self.assertRaises(InvalidTel, telnum.createNumber, "hallo") + self.assertRaises(InvalidTel, telnum.createNumber, "0?242") + + def testNumber(self): + telnum=Telnumber("0345-94103640") + telnum2=Telnumber("+49345/94103640") + telnum3=Telnumber("00493459410364-0") + telnum4=Telnumber("+49(0)345-94103640") + + self.assertEqual(telnum.land, "49") + self.assertEqual(telnum.number, "34594103640") + + self.equalNumber(telnum, telnum2) + self.equalNumber(telnum, telnum3) + self.equalNumber(telnum, telnum4) + + def testEqual(self): + telnum=Telnumber("0345-94103640") + telnum2=Telnumber("+49345/94103640") + li=[] + self.assertEqual(telnum == telnum2, True) + self.assertEqual(telnum <> telnum2, False) + self.assertEqual(telnum, telnum2) + self.assertEqual(telnum in li,False) + li.append(telnum) + self.assertEqual(telnum in li,True) + self.assertEqual(telnum2 in li,True) + + def testHash(self): + telnum=Telnumber("0345-94103640") + self.assertEqual(hash(telnum),hash("004934594103640")) + self.assertNotEqual(hash(telnum),hash("004934594103641")) + + def testString(self): + telnum=Telnumber("0345-94103640") + self.assertEqual(str(telnum),"004934594103640") + + def testRepr(self): + telnum=Telnumber("0345-94103640") + self.assertEqual(repr(telnum),"") diff -r 448dd8d36839 -r 3929338fd17f iro/tests/test.pdf Binary file iro/tests/test.pdf has changed diff -r 448dd8d36839 -r 3929338fd17f iro/tests/validate.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/validate.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,135 @@ +from twisted.trial import unittest +from mock import Mock + +from iro.validate import vBool, vInteger, vHash, validate +from iro.error import ValidateException + +class testValidators(unittest.TestCase): + '''test for simple validators''' + + def testBool(self): + self.assertEqual(vBool(True,None),True) + self.assertEqual(vBool(1,None),True) + self.assertEqual(vBool("true",None),True) + self.assertEqual(vBool("True",None),True) + self.assertEqual(vBool("TRUE",None),True) + + self.assertEqual(vBool(False,None),False) + self.assertEqual(vBool(0,None),False) + self.assertEqual(vBool("false",None),False) + self.assertEqual(vBool("False",None),False) + self.assertEqual(vBool("FALSE",None),False) + + e = self.assertRaises(ValidateException, vBool, "TRue","test") + self.assertEqual(e.msg,"test is not boolean") + + def testInteger(self): + self.assertEqual(vInteger(123,None),123) + self.assertEqual(vInteger("123",None),123) + self.assertEqual(vInteger("-123",None),-123) + + self.assertRaises(ValidateException, vInteger, "a123",None) + + def testIntegerLimits(self): + self.assertEqual(vInteger(10,None,maxv=10),10) + self.assertRaises(ValidateException, vInteger, 11, None, maxv=10) + + self.assertEqual(vInteger(4,None,minv=4),4) + self.assertRaises(ValidateException, vInteger, 3, None, minv=4) + + self.assertRaises(ValidateException, vInteger, -1, None, minv=0) + self.assertRaises(ValidateException, vInteger, 1, None, maxv=0) + + def testIntegerNoneAllowed(self): + self.assertEqual(vInteger(None,None,none_allowed=True),None) + self.assertEqual(vInteger('',None,none_allowed=True),None) + + self.assertRaises(ValidateException, vInteger, "", None) + self.assertRaises(ValidateException, vInteger, None, None) + + def testHash(self): + self.assertEqual(vHash("0123456789abcdef",None),"0123456789abcdef") + self.assertEqual(vHash("0123456789ABCDEF",None),"0123456789abcdef") + self.assertEqual(vHash("F",None),"f") + self.assertEqual(vHash("",None),'') + + self.assertRaises(ValidateException, vHash, "GHIJKL", None) + + def testHashLimits(self): + self.assertEqual(vHash("F",None,minlength=1),"f") + self.assertRaises(ValidateException, vHash, "", None, minlength=1) + + self.assertEqual(vHash("Fa",None,maxlength=2),"fa") + self.assertRaises(ValidateException, vHash, "123", None, maxlength=1) + + + def testValidate(self): + f = Mock() + f.return_value = "valid" + @validate("t",f,True,1,2,3,4,k=5) + def g(u=False, t="bla"): + return t + d = g(t="uhuhu") + def r(t): + f.called_once_with("uhuhu","t",1,2,3,4,k=5) + self.assertEqual(t,"valid") + d.addCallback(r) + return d + + def testValidateMissingNeed(self): + f = Mock() + @validate("t",f,True,1,2,3,4,k=5) + def g(u, t="buuh"): + return t + e = self.assertRaises(ValidateException, g, u="uhuhu", t = None) + self.assertEqual(str(e),'700: t is nessasary') + + def testValidateMissingNeedNonExplicit(self): + f = Mock() + @validate("t",f,True,1,2,3,4,k=5) + def g(u, **k): + return k["t"] + e = self.assertRaises(ValidateException, g, u="uhuhu") + self.assertEqual(str(e),'700: t is nessasary') + + + def testValidateMissingNeed2(self): + f = Mock() + f.return_value = "valid" + @validate("t",f,True,1,2,3,4,k=5) + def g(u, t="buuh"): + return t + + d = g(True) + + def r(t): + f.called_once_with("buuh","t",1,2,3,4,k=5) + self.assertEqual(t,"valid") + d.addCallback(r) + return d + + def testvalidateNoNeed(self): + f = Mock() + f.return_value = "valid" + @validate("t",f,False,1,2,3,4,k=5) + def g(u, t="buuh"): + return t + d = g("uhu") + def r(t): + self.assertEqual(f.called,True) + self.assertEqual(t,"valid") + d.addCallback(r) + return d + + def testvalidateNoNeed2(self): + f = Mock() + f.return_value = "valid" + @validate("t",f,False,1,2,3,4,k=5) + def g(u, **k): + return k["t"] + d = g("uhu") + def r(t): + self.assertEqual(f.called,False) + self.assertEqual(t,None) + d.addCallback(r) + return d diff -r 448dd8d36839 -r 3929338fd17f iro/tests/viewinterface.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/viewinterface.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,269 @@ +from twisted.internet.defer import inlineCallbacks +from datetime import datetime +from Queue import deque + +from iro.model.schema import User, Offer, Userright, Job, Message +from iro.controller.viewinterface import Interface +from iro.controller.pool import taskPool + +from iro.model.message import SMS, Fax, Mail +from iro.model.pool import data +from iro.model.offer import offers +from iro.model.job import exJobs + +import iro.error as IroError + +from ..test_helpers.dbtestcase import DBTestCase + +class DummyPool(): + def run(self, f,*a,**k): + return f(*a,**k) + +class ViewInterfaceTest(DBTestCase): + """tests for the xmlrpc interface""" + def setUp(self): + DBTestCase.setUp(self) + self.pool = data.pool + data.pool = DummyPool() + + def tearDown(self): + exJobs.clear() + offers.clear() + taskPool.pool.q.queue = deque() + data.pool = self.pool + self.pool = None + DBTestCase.tearDown(self) + + @inlineCallbacks + def testStatus(self): + ''' test the status function''' + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(User(name='test',apikey='abcdef123456789')) + st = yield Interface().status('abcdef123456789') + self.assertEqual(st, {}) + + with self.session() as session: + u = session.merge(u) + j = Job(info='info', status="started") + j.user=u + session.add(j) + session.commit() + jid=j.id + status = {str(jid):{"status":"started"}} + st = yield Interface().status('abcdef123456789',jid) + self.assertEqual(st, status) + st = yield Interface().status('abcdef123456789') + self.assertEqual(st, status) + st = yield Interface().status('abcdef123456789', '', 'false') + self.assertEqual(st, status) + st = yield Interface().status('abcdef123456789', '', 0) + self.assertEqual(st, status) + + #JobNotFound + d = Interface().status('abcdef123456789',jid+1) + self.assertFailure(d, IroError.JobNotFound) + yield d + + #self.assertEqual(self.__rpc2().status('abcdef123456789','abcde', True), ["",'abcde', True]) + #self.assertEqual(self.__rpc2().status('abcdef123456789', '', 'true'), ["", '', True]) + #self.assertEqual(self.__rpc2().status('abcdef123456789', '', 1), ["", '', True]) + + def testNoSuchUser(self): + '''a unknown user should raise a UserNotNound Exception + bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception''' + d = Interface().status('abcdef123456789') + self.assertFailure(d, IroError.UserNotFound) + return d + + + def testValidationFault(self): + '''a validate Exception should be translated to a xmlrpclib.Fault.''' + d = Interface().status('xxx') + self.assertFailure(d, IroError.ValidateException) + + @inlineCallbacks + def testRoutes(self): + '''test the route function''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + r = yield Interface().routes('abcdef123456789','sms') + self.assertEqual(r, ['sipgate_basic']) + + d = Interface().routes('abcdef123456789','fax') + self.assertFailure(d,IroError.ValidateException) + yield d + + with self.session() as session: + o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") + u = session.query(User).filter_by(name="test").first() + u.rights.append(Userright(o)) + o=Offer(name="faxde", provider="faxde", route="", typ="fax") + session.add(o) + session.commit() + + r = yield Interface().routes('abcdef123456789','sms') + self.assertEqual(r, ['sipgate_basic','sipgate_plus']) + r = yield Interface().routes('abcdef123456789','fax') + self.assertEqual(r, []) + + + with self.session() as session: + u = session.query(User).filter_by(name="test").first() + u.rights.append(Userright(o)) + + r = yield Interface().routes('abcdef123456789','sms') + self.assertEqual(r, ['sipgate_basic','sipgate_plus']) + r = yield Interface().routes('abcdef123456789','fax') + self.assertEqual(r, ['faxde']) + + @inlineCallbacks + def testDefaultRoutes(self): + '''test the defaultRoute function''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o,True)) + o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + r = yield Interface().defaultRoute('abcdef123456789','sms') + self.assertEqual(r, ['sipgate_basic']) + + @inlineCallbacks + def testTelnumbers(self): + '''test the telefon validator''' + r = yield Interface().telnumber(["0123/456(78)","+4912346785433","00123435456-658"]) + self.assertEqual(r, True) + + invalid=['xa','+1','1-23',';:+0','0123'] + + d = Interface().telnumber(['01234']+invalid) + def x(failure): + self.assertEqual(failure.getErrorMessage(),"701: No valid telnumber: '%s'"%invalid[0]) + failure.raiseException() + d.addErrback(x) + self.assertFailure(d, IroError.InvalidTel) + yield d + + @inlineCallbacks + def testVaildEmail(self): + '''test vaild email adresses (got from wikipedia)''' + validmails=["niceandsimple@example.com"] + r = yield Interface().email(validmails) + self.assertEqual(r,True) + + def testInvaildEmail(self): + '''test invaild email adresses (got from wikipedia)''' + invalid=["Abc.example.com",] + d = Interface().email(invalid) + self.assertFailure(d, IroError.InvalidMail) + return d + + @inlineCallbacks + def testBill(self): + '''test bill function''' + apikey='abcdef123456789' + with self.session() as session: + u=User(name='test',apikey=apikey) + session.add(u) + + r = yield Interface().bill(apikey) + self.assertEqual(r,{'total':{'price':0.0,'anz':0}}) + + with self.session() as session: + u = session.merge(u) + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + u.rights.append(Userright(o)) + j = Job(info='i',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) + u.jobs.append(j) + + j = Job(info='a',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) + u.jobs.append(j) + + ret=yield Interface().bill(apikey) + self.assertEqual(ret['total'],{'price':0.8,'anz':2}) + self.assertEqual(ret['sipgate_basic'], + {'price':0.8,'anz':2, + 'info':{'i':{'price':0.4,'anz':1}, + 'a':{'price':0.4,'anz':1}, + } + }) + + @inlineCallbacks + def testSMS(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + u.rights.append(Userright(o)) + session.add(u) + + jobid = yield Interface().sms('abcdef123456789','message',['0123325456'],['sipgate_basic']) + + with self.session() as session: + u = session.merge(u) + job = u.job(jobid) + exJob = job.extend + + self.assertEqual(exJob.message,SMS("message",None)) + self.assertEqual(taskPool.pool.q.qsize(),1) + + + @inlineCallbacks + def testMail(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + o = Offer(name='loc',provider="localhost",route="",typ="mail") + u.rights.append(Userright(o)) + session.add(u) + + jobid = yield Interface().mail('abcdef123456789','sub', "hey body!", ['t@te.de'], "frm@t.de" ,['loc']) + with self.session() as session: + u = session.merge(u) + job = u.job(jobid) + exJob = job.extend + + self.assertEqual(exJob.message,Mail("sub",'hey body!','frm@t.de')) + self.assertEqual(taskPool.pool.q.qsize(),1) + + @inlineCallbacks + def testMailFrmNone(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + o = Offer(name='loc',provider="localhost",route="",typ="mail") + u.rights.append(Userright(o)) + session.add(u) + + jobid = yield Interface().mail('abcdef123456789','sub', "hey body!", ['t@te.de'], None,['loc']) + with self.session() as session: + u = session.merge(u) + job = u.job(jobid) + exJob = job.extend + + self.assertEqual(exJob.message,Mail("sub",'hey body!',None)) + self.assertEqual(taskPool.pool.q.qsize(),1) + + + + @inlineCallbacks + def testFax(self): + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + o = Offer(name='b',provider="sipgate",route="b",typ="fax") + u.rights.append(Userright(o)) + session.add(u) + + jobid = yield Interface().fax('abcdef123456789','subject', 'blublbubblu',['0123325456'],['b']) + + with self.session() as session: + u = session.merge(u) + job = u.job(jobid) + exJob = job.extend + + self.assertEqual(exJob.message,Fax("subject","blublbubblu")) + self.assertEqual(taskPool.pool.q.qsize(),1) diff -r 448dd8d36839 -r 3929338fd17f iro/tests/xmlrpc.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/xmlrpc.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,216 @@ +from multiprocessing import Process +import unittest + +from datetime import datetime + +import time + +from xmlrpclib import Server as xServer, ServerProxy, Fault + +from iro.model.schema import User, Offer, Userright, Job, Message + +from iro.main import runReactor + +import iro.error as IroError + +from ..test_helpers.dbtestcase import DBTestCase + + +class XMLRPCTest(DBTestCase): + """tests for the xmlrpc interface""" + def setUp(self): + DBTestCase.setUp(self) + self.s = Process(target=startReactor, args=(self.engine,)) + self.s.start() + #the new process needs time to get stated, so this process has to sleep + time.sleep(.2) + + def tearDown(self): + self.__debug().stop() + time.sleep(.2) + self.s.join() + DBTestCase.tearDown(self) + + def __debug(self): + return xServer('http://localhost:7080/debug') + + def __rpc2(self): + return ServerProxy('http://localhost:7080/RPC2') + + def testDebugHello(self): + '''simple test for the connection to xmlrpc server''' + ret=self.__debug().hello() + self.failUnlessEqual(ret,'hello') + + def testListMethods(self): + '''list of all offical Methods, that can be executed''' + ret=self.__rpc2().listMethods() + self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'bill', 'telnumber','email']) + + def testStatus(self): + ''' test the status function''' + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(User(name='test',apikey='abcdef123456789')) + self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), {}) + + with self.session() as session: + u = session.merge(u) + j = Job(info='info', status="started") + j.user=u + session.add(j) + session.commit() + jid=j.id + status = {str(jid):{"status":"started"}} + self.failUnlessEqual(self.__rpc2().status('abcdef123456789',jid), status) + self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), status) + self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'false'), status) + self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 0), status) + + #JobNotFound + exc = self.assertRaises(Fault, self.__rpc2().status, 'abcdef123456789',jid+1) + unf = IroError.JobNotFound() + self.failUnlessEqual(exc.faultCode, unf.code) + self.failUnlessEqual(exc.faultString, unf.msg) + + #self.failUnlessEqual(self.__rpc2().status('abcdef123456789','abcde', True), ["",'abcde', True]) + #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'true'), ["", '', True]) + #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 1), ["", '', True]) + + def testNoSuchUser(self): + '''a unknown user should raise a UserNotNound Exception + bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception''' + exc = self.assertRaises(Fault, self.__rpc2().status, 'abcdef123456789') + unf=IroError.UserNotFound() + self.failUnlessEqual(exc.faultCode, unf.code) + self.failUnlessEqual(exc.faultString, unf.msg) + + def testNoSuchMethod(self): + '''a unknown mothod should raise a Exception ''' + exc = self.assertRaises(Fault, self.__rpc2().nosuchmethod) + self.failUnlessEqual(exc.faultCode, 8001) + self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found") + + def testValidationFault(self): + '''a validate Exception should be translated to a xmlrpclib.Fault.''' + exc = self.assertRaises(Fault, self.__rpc2().status,'xxx') + self.failUnlessEqual(exc.faultCode, 700) + self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.") + + def testRoutes(self): + '''test the route function''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic']) + + exc = self.assertRaises(Fault, self.__rpc2().routes,'abcdef123456789','fax') + self.failUnlessEqual(exc.faultCode, 700) + self.failUnlessEqual(exc.faultString, "Typ is not valid.") + + with self.session() as session: + o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") + u = session.query(User).filter_by(name="test").first() + u.rights.append(Userright(o)) + o=Offer(name="faxde", provider="faxde", route="", typ="fax") + session.add(o) + session.commit() + self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus']) + self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),[]) + + with self.session() as session: + u = session.query(User).filter_by(name="test").first() + u.rights.append(Userright(o)) + + self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus']) + self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),['faxde']) + + def testDefaultRoutes(self): + '''test the defaultRoute function''' + with self.session() as session: + u=User(name='test',apikey='abcdef123456789') + o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") + u.rights.append(Userright(o,True)) + o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") + u.rights.append(Userright(o)) + session.add(u) + self.failUnlessEqual(self.__rpc2().defaultRoute('abcdef123456789','sms'),['sipgate_basic']) + + def testTelnumbers(self): + '''test the telefon validator''' + self.failUnlessEqual(self.__rpc2().telnumber(["0123/456(78)","+4912346785433","00123435456-658"]),True) + + invalid=['xa','+1','1-23',';:+0','0123'] + + exc = self.assertRaises(Fault, self.__rpc2().telnumber,['01234']+invalid) + self.failUnlessEqual(exc.faultCode, 701) + self.failUnlessEqual(exc.faultString, "No valid telnumber: '%s'" % invalid[0]) + + + def testVaildEmail(self): + '''test vaild email adresses (got from wikipedia)''' + validmails=["niceandsimple@example.com"] + self.failUnlessEqual(self.__rpc2().email(validmails),True) + + def testInvaildEmail(self): + '''test invaild email adresses (got from wikipedia)''' + invalid=["Abc.example.com",] + exc= self.assertRaises(Fault, self.__rpc2().email, invalid) + self.failUnlessEqual(exc.faultCode, 702) + self.failUnlessEqual(exc.faultString, "No valid email: '%s'" % invalid[0]) + + def testBill(self): + '''test bill function''' + apikey='abcdef123456789' + with self.session() as session: + u=User(name='test',apikey=apikey) + session.add(u) + + self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.0,'anz':0}}) + + with self.session() as session: + u = session.merge(u) + o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") + u.rights.append(Userright(o)) + j = Job(info='i',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) + u.jobs.append(j) + + j = Job(info='a',status='sended') + j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) + u.jobs.append(j) + + ret=self.__rpc2().bill(apikey) + self.failUnlessEqual(ret['total'],{'price':0.8,'anz':2}) + self.failUnlessEqual(ret['sipgate_basic'], + {'price':0.8,'anz':2, + 'info':{'i':{'price':0.4,'anz':1}, + 'a':{'price':0.4,'anz':1}, + } + }) + + +def startReactor(engine): + """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """ + from twisted.internet import reactor + from twisted.web import xmlrpc, resource + + from iro.view.xmlrpc import appendResource + + class XMLRPCDebug(xmlrpc.XMLRPC): + def xmlrpc_stop(self): + reactor.callLater(0.1,reactor.stop) + return "" + + def xmlrpc_hello(self): + return "hello" + + root = resource.Resource() + root = appendResource(root) + root.putChild('debug', XMLRPCDebug()) + runReactor(reactor, engine, root) + +if __name__ == '__main__': + unittest.main() diff -r 448dd8d36839 -r 3929338fd17f iro/tests/xmlrpc_client.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/xmlrpc_client.py Sun Mar 18 14:06:27 2012 +0100 @@ -0,0 +1,12 @@ +from multiprocessing.pool import ThreadPool +import xmlrpclib +import timeit + + +def x(i): + xmlrpclib.ServerProxy('http://192.168.56.101:7080/RPC2').status('abcdef123456789') + +pool=ThreadPool(50) + +print min(timeit.repeat(lambda:pool.map(x,range(51)),number=1,repeat=1)) + diff -r 448dd8d36839 -r 3929338fd17f tests/__init__.py diff -r 448dd8d36839 -r 3929338fd17f tests/config.py --- a/tests/config.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,187 +0,0 @@ -from mock import patch, Mock -from twisted.trial import unittest -import signal -import io -import ConfigParser -from iro import config, error -from iro.offer.provider import Provider, providers - -class TestModuleConfig(unittest.TestCase): - '''test config class''' - - def setUp(self): - self._reloadList=config.configParser.reloadList - config.configParser.reloadList=[] - - def tearDown(self): - config.configParser.reloadList = self._reloadList - - @patch('iro.config.main') - @patch('iro.config.configParser') - def testReadConfig(self,pConfig,pMain): - pMain._init = True - config.readConfig() - self.assertEqual([i[0] for i in pConfig.method_calls],["read","reload","items"]) - pConfig.read.assert_called_once_with(config.confFiles) - pConfig.items.assert_called_once_with("main") - pConfig.reload.assert_called_once_with() - self.assertEqual(pMain.load.called,1) - - @patch('iro.config.main') - @patch('iro.config.configParser') - def testReadConfigInit(self,pConfig,pMain): - pConfig.items.return_value = [('dburl',''),("port",1000)] - pMain._init = False - config.readConfig() - self.assertEqual([i[0] for i in pConfig.method_calls],["read","reload","items"]) - self.assertEqual([i[0] for i in pMain.method_calls],["same"]) - sa = pMain.same.call_args_list[0][0][0] - self.assertEqual(sa.port,1000) - self.assertEqual(sa.dburl,'') - pMain.same.return_value = False - self.assertRaises(Exception,config.readConfig,) - - - - @patch('signal.signal') - @patch('iro.config.readConfig') - def testRegisterSignal(self, pReadConfig, pSignal): - config.registerSignal() - self.assertEqual(pSignal.call_args[0][0],signal.SIGUSR2) - self.assertEqual(pReadConfig.called,0) - pSignal.call_args[0][1](None, None) - pReadConfig.assert_called_once_with() - - def testRegisterReload(self): - def x(): - pass - config.configParser.registerReload(x) - self.assertEqual(config.configParser.reloadList,[x]) - - def testReload(self): - x = Mock() - config.configParser.reloadList = [x] - config.configParser.reload_() - x.assert_called_once_with() - - -class TestRead(unittest.TestCase): - - def tearDown(self): - for s in config.configParser.sections(): - config.configParser.remove_section(s) - - @patch('iro.config.ConfigParser.read') - def testMain(self,pRead): - sample_config = """[main] -port = 8000 -dburl = sdfdsafgsfdg -""" - config.configParser.readfp(io.BytesIO(sample_config)) - config.configParser.read([]) - pRead.assert_called_once_with(config.configParser,[]) - - @patch('iro.config.ConfigParser.read') - def testMainBadPort(self,pRead): - sample_config = """[main] -port = -8000 -dburl = sadfaserasg -""" - config.configParser.readfp(io.BytesIO(sample_config)) - self.assertRaises(error.ValidateException, config.configParser.read, []) - - @patch('iro.config.ConfigParser.read') - def testMainNoMust(self,pRead): - sample_config = """[main] -port = 8000 -""" - config.configParser.readfp(io.BytesIO(sample_config)) - self.assertRaises(config.NeededOption, config.configParser.read, []) - - @patch('iro.config.ConfigParser.read') - def testMust(self,pRead): - v=Mock() - config.main.options["test"] = config.Option(v,default="jehei") - try: - sample_config = """[main] -dburl = sdfawersdf -port = 8000 -""" - config.configParser.readfp(io.BytesIO(sample_config)) - config.configParser.read([]) - self.assertEqual(v.called,0) - config.main.load(config.configParser.items("main")) - self.assertEqual(config.main.test,'jehei') - sample_config = """[main] -dburl = adfgsdftsfg -port = 8000 -test = foohu - """ - config.configParser.readfp(io.BytesIO(sample_config)) - config.configParser.read([]) - v.assert_called_once_with("foohu","test") - finally: - del(config.main.options["test"]) - - @patch('iro.config.ConfigParser.read') - def testProviders(self, pRead): - v=Mock() - class TestProvider(Provider): - def __init__(self,name): - Provider.__init__(self,name) - self.options.update({"test":config.Option(v)}) - providers["test"]=TestProvider - try: - sample_config = """[p] -""" - config.configParser.readfp(io.BytesIO(sample_config)) - self.assertRaises(ConfigParser.NoOptionError, config.configParser.read, []) - self.assertEqual(v.called,0) - sample_config = """[p] -typ= test -test= foo -""" - config.configParser.readfp(io.BytesIO(sample_config)) - config.configParser.read([]) - v.assert_called_once_with("foo","test") - finally: - del(providers["test"]) - - -class TestConfig(unittest.TestCase): - def testSame(self): - c1 = config.Config("1") - c1.port = 1 - c1.dburl = "dburl1" - c2 = config.Config("2") - c2.port = 1 - c2.dburl = "dburl1" - self.assertTrue(c1.same(c2)) - self.assertTrue(c1.same(c1)) - self.assertTrue(c2.same(c1)) - c2.port = 2 - self.assertFalse(c2.same(c1)) - self.assertFalse(c1.same(c2)) - c2.port = 1 - c2.dburl = "dburl2" - self.assertFalse(c2.same(c1)) - self.assertFalse(c1.same(c2)) - - def testSampleConf(self): - c1 = config.Config("1") - self.assertEqual(c1.sampleConf(),["[1]", - "# Connection URL to database", - "dburl = ","", - "# Port under that twisted is running", - "port = ",""]) - - def testsampleConfDefault(self): - c1 = config.Config("1") - c1.options["port"].default = 12345 - c1.options["port"].must = False - c1.options["dburl"].default = True - self.assertEqual(c1.sampleConf(),["[1]", - "# Connection URL to database", - "dburl = True","", - "# Port under that twisted is running", - "# port = 12345",""]) diff -r 448dd8d36839 -r 3929338fd17f tests/db.py --- a/tests/db.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,286 +0,0 @@ -import unittest -from sqlalchemy.orm.exc import MultipleResultsFound -from datetime import datetime - -from iro.model.schema import User, Offer, Userright, Job, Message -from decimal import Decimal - -from .dbtestcase import DBTestCase - -class DBTests(DBTestCase): - """tests for the db model""" - - def testRoutes(self): - '''test routes''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.routes('sms').all(),[('sipgate_basic',),]) - - def testRoutesNoDefault(self): - '''test default routes''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.routes('sms',default=True).all(),[]) - - def testRoutesDefault(self): - '''test default routes''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o,default=1)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.routes('sms',default=True).all(),[('sipgate_basic',),]) - - def testRoutesDefaultOrdering(self): - '''test default routes ordering''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="s1", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o,default=3)) - o=Offer(name="s2", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o,default=None)) - o=Offer(name="s3", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o,default=1)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.routes('sms',default=True).all(),[('s3',),('s1',)]) - - def testProviders(self): - '''test providers''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.providers('sms').all(),[('sipgate',),]) - - def testTyps(self): - with self.session() as session: - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - session.add(o) - - with self.session() as session: - self.assertEqual(Offer.typs(session).all(),[('sms',),]) - - with self.session() as session: - o=Offer(name="s2", provider="sipgate", route="basic", typ="sms") - session.add(o) - o=Offer(name="s3", provider="sipgate", route="basic", typ="sms2") - session.add(o) - - with self.session() as session: - self.assertEqual(Offer.typs(session).order_by(Offer.typ).all(),[('sms',),('sms2',)]) - - def testOfferRoutes(self): - with self.session() as session: - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - session.add(o) - - with self.session() as session: - self.assertEqual([o.name for o in Offer.routes(session, "sms").order_by(Offer.name)],["sipgate_basic"]) - - with self.session() as session: - o=Offer(name="s2", provider="sipgate", route="basic", typ="sms") - session.add(o) - o=Offer(name="s3", provider="sipgate", route="basic", typ="sms2") - session.add(o) - - with self.session() as session: - self.assertEqual([o.name for o in Offer.routes(session, "sms2").order_by(Offer.name)],["s3"]) - self.assertEqual([o.name for o in Offer.routes(session, "sms").order_by(Offer.name)],["s2","sipgate_basic"]) - - def testOfferProviders(self): - with self.session() as session: - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - session.add(o) - - with self.session() as session: - self.assertEqual([o.provider for o in Offer.providers(session, "sms").order_by(Offer.provider)],["sipgate"]) - - with self.session() as session: - o=Offer(name="s2", provider="sipgate2", route="basic", typ="sms") - session.add(o) - o=Offer(name="s3", provider="sipgate", route="basic", typ="sms2") - session.add(o) - - with self.session() as session: - self.assertEqual([o.provider for o in Offer.providers(session, "sms2").order_by(Offer.provider)],["sipgate"]) - self.assertEqual([o.provider for o in Offer.providers(session, "sms").order_by(Offer.provider)],["sipgate","sipgate2"]) - -class Has_RightTests(DBTestCase): - '''test User.has_right''' - def testSimple(self): - '''test a very simple case''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.has_right("sms"),"sipgate_basic") - self.assertEqual(u.has_right("sms", offer_name="sipgate_basic"),"sipgate_basic") - self.assertEqual(u.has_right("sms", route="basic"),"sipgate_basic") - self.assertEqual(u.has_right("sms", provider="sipgate"),"sipgate_basic") - - def testDouble(self): - '''now we have different routes''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="b", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - o=Offer(name="c", provider="sipgate", route="c", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.has_right("sms", offer_name="b"),"b") - self.assertEqual(u.has_right("sms", route="basic"),"b") - self.assertEqual(u.has_right("sms", offer_name="c"),"c") - self.assertEqual(u.has_right("sms", route="c"),"c") - self.assertRaises(MultipleResultsFound, u.has_right,"sms") - self.assertRaises(MultipleResultsFound, u.has_right,"sms", provider="sipgate") - - def testUnknown(self): - ''' a unknown typ''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="b", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - o=Offer(name="c", provider="sipgate", route="c", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.has_right("fax", offer_name="b"), None) - self.assertEqual(u.has_right("fax"), None) - - - -class BillTest(DBTestCase): - """test the bill function""" - - def testBill(self): - '''test bill function''' - apikey='abcdef123456789' - with self.session() as session: - u=User(name='test',apikey=apikey) - session.add(u) - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - j = Job(info='i',status='sended') - m = Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.30, offer=o, job=j) - u.rights.append(Userright(o)) - u.jobs.append(j) - session.add(m) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.rights[0].bill.all(),[(1L,Decimal('0.3000'),'i')]) - - def testBillAgreget(self): - apikey='abcdef123456789' - with self.session() as session: - u=User(name='test',apikey=apikey) - session.add(u) - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - j = Job(info='i',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.30, offer=o)) - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) - u.rights.append(Userright(o)) - u.jobs.append(j) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.rights[0].bill.all(),[(2L,Decimal('0.7000'),'i')]) - - def testBillIsBilled(self): - apikey='abcdef123456789' - with self.session() as session: - u=User(name='test',apikey=apikey) - session.add(u) - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - j = Job(info='i',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=True, date=datetime.now() , price=0.30, offer=o)) - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) - u.rights.append(Userright(o)) - u.jobs.append(j) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.rights[0].bill.all(),[(1L,Decimal('0.4000'),'i')]) - - def testBillMultipleJobs(self): - apikey='abcdef123456789' - with self.session() as session: - u=User(name='test',apikey=apikey) - session.add(u) - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - u.rights.append(Userright(o)) - - j = Job(info='i',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) - u.jobs.append(j) - - j = Job(info='a',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) - u.jobs.append(j) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.rights[0].bill.order_by(Job.info).all(), - [(1L,Decimal('0.4000'),'a'), - (1L,Decimal('0.4000'),'i') - ]) - - def testBillMultipleOffers(self): - apikey='abcdef123456789' - with self.session() as session: - u=User(name='test',apikey=apikey) - session.add(u) - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - u.rights.append(Userright(o)) - - j = Job(info='a',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) - u.jobs.append(j) - - o = Offer(name='sipgate_gold',provider="sipgate",route="gold",typ="sms") - u.rights.append(Userright(offer=o)) - - j = Job(info='a',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.5, offer=o)) - u.jobs.append(j) - - with self.session() as session: - u=session.merge(u) - self.assertEqual(u.rights[0].bill.all(), - [(1L,Decimal('0.4000'),'a')]) - - self.assertEqual(u.rights[1].bill.all(),[(1L,Decimal('0.5000'),'a')]) - - -if __name__ == '__main__': - unittest.main() diff -r 448dd8d36839 -r 3929338fd17f tests/email_validate.py --- a/tests/email_validate.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,65 +0,0 @@ -import unittest - -from iro.validate import vEmail -from iro.error import InvalidMail, ValidateException - - -class testEmail(unittest.TestCase): - """tests for email adresses""" - - def testVaildEmail(self): - '''test vaild email adresses (got from wikipedia)''' - validmails=["niceandsimple@example.com", - "simplewith+symbol@example.com", - 'a.little.unusual@example.com', - 'a.little.more.unusual@dept.example.com', - '@[10.10.10.10]', - '@[1.1.1.1]', - '@[200.100.100.100]', - 'user@[2001:db8:1ff::a0b:dbd0]', - '"much.more\ unusual"@example.com', - 't."test".t@example.com', - '"very.unusual.@.unusual.com"@example.com', - '"very.(),:;<>[]\\".VERY.\\"very@\\\ \\"very\\".unusual"@strange.example.com', - "!#$%&'*+-/=?^_`{}|~@example.org", - '"()<>[]:;@,\\\"!#$%&\'*+-/=?^_`{}| ~ ? ^_`{}|~."@example.org', - '""@example.org'] - - for num in validmails: - self.failUnlessEqual(vEmail([num],None),[num]) - - def testInvaildEmail(self): - '''test invaild email adresses (got from wikipedia)''' - invalid=["Abc.example.com", # (an @ character must separate the local and domain parts) - "Abc.@example.com", # (character dot(.) is last in local part) - "Abc..123@example.com", # (character dot(.) is double) - "A@b@c@example.com", # (only one @ is allowed outside quotation marks) - 'a"b(c)d,e:f;gi[j\k]l@example.com', # (none of the special characters in this local part is allowed outside quotation marks) - 'just"not"right@example.com', # (quoted strings must be dot separated, or the only element making up the local-part) - 'thisis."notallowed@example.com', # (spaces, quotes, and backslashes may only exist when within quoted strings and preceded by a slash) - 'this\\ still\\"not\\allowed@example.com', # (even if escaped (preceded by a backslash), spaces, quotes, and backslashes must still be contained by quotes) - 't."test".t"test".t@example.com', - 't."test"t."test".t@example.com', - ] - - for number in invalid: - with self.assertRaises(InvalidMail) as e: - vEmail([number],None) - self.failUnlessEqual(e.exception.number, number) - - def testInvalidDomain(self): - '''invalid Domainname''' - with self.assertRaises(InvalidMail): - vEmail(['x@&.de'],None) - - def testDoubles(self): - self.assertEqual(vEmail(['x@test.de','x@test.de'],None),["x@test.de"]) - - def testString(self): - self.assertEqual(vEmail('x@test.de', None),"x@test.de") - - def testAllowString(self): - self.assertRaises(ValidateException,vEmail, 'x@test.de', None, allowString=False) - - def testAllowList(self): - self.assertRaises(ValidateException,vEmail, ['x@test.de'], None, allowList=False) diff -r 448dd8d36839 -r 3929338fd17f tests/install.py --- a/tests/install.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,110 +0,0 @@ -#from mock import patch, Mock -from twisted.trial import unittest -from twisted.python import log -from sqlalchemy import create_engine -import os - -from iro import install -from iro import config -from iro.model.schema import Base - -from .dbtestcase import md, SampleDatabase - -class DummyObserver(object): - def __init__(self): - self.e=[] - - def start(self): - log.addObserver(self.emit) - - def stop(self): - log.removeObserver(self.emit) - - def emit(self, eventDict): - self.e.append(eventDict) - - -class TestInstallation(unittest.TestCase): - '''test install script''' - - def setUp(self): - md.setUp() - if not hasattr(md,"db2"): - md.db2=SampleDatabase("test2","test2",'%s/my.cnf'%md.tdir) - md.dburl2='mysql://test2:test@localhost/test2?unix_socket=%s/socket'%md.tdir - md.db2.create() - self.log = DummyObserver() - self.log.start() - self.engine = None - - def tearDown(self): - self.log.stop() - if self.engine: - Base.metadata.drop_all(self.engine) - self.engine = None - try: - os.remove("iro.conf") - except OSError as e: - if e.errno != 2: - raise - - - - def testCheckConfig(self): - self.assertEqual(install.checkConfig(),False) - with open("iro.conf",'w') as fp: - fp.write("""[main] -dburl=foo""") - self.assertEqual(install.checkConfig(),False) - self.assertEqual(len(self.log.e),1) - self.assertEqual(self.log.e[0]['message'], ("Error while processing config file: Option 'port' in section 'main' is missing.",)) - with open("iro.conf",'w') as fp: - fp.write("""[main] -dburl=foo -port=123456 -""") - self.assertEqual(install.checkConfig(),True) - - def testCheckDatabase(self): - config.main.dburl=md.dburl2 - self.assertTrue(install.checkDatabaseConnection()) - self.assertFalse(install.checkDatabase()) - self.engine = create_engine(md.dburl2) - Base.metadata.create_all(self.engine) - self.assertTrue(install.checkDatabase()) - - def testCheckDatabaseConnection(self): - config.main.dburl="mysql://t@localhost/test" - self.assertFalse(install.checkDatabaseConnection()) - self.assertEqual(len(self.log.e),1) - self.assertTrue(self.log.e[0]['message'][0].startswith("Error while trying to connect to database\n")) - config.main.dburl="sqlite://" - self.assertTrue(install.checkDatabaseConnection()) - - - def testCreateDatabase(self): - config.main.dburl=md.dburl2 - self.assertTrue(install.checkDatabaseConnection()) - self.assertFalse(install.checkDatabase()) - install.createDatabase() - self.assertTrue(install.checkDatabase()) - - def testCreateSampleConfig(self): - install.createSampleConfig() - with open("iro.conf",'r') as fp: - c = fp.read() - with open(os.path.abspath("../iro.conf.inst"),"r") as fp: - self.assertEqual(c,fp.read()) - - def testNoOverwrite(self): - with open("iro.conf","w") as fp: - fp.write("muhaha") - install.createSampleConfig() - with open("iro.conf",'r') as fp: - self.assertEqual(fp.read(),"muhaha") - self.assertEqual(len(self.log.e),1) - self.assertEqual(self.log.e[0]['message'], ("iro.conf exists and will not be overwritten.",)) - - def testCheck(self): - pass - testCheck.todo = "to implement" diff -r 448dd8d36839 -r 3929338fd17f tests/job.py --- a/tests/job.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,194 +0,0 @@ -from datetime import datetime -from decimal import Decimal -from mock import patch - -from iro.model.job import exJobs, ExJob -from iro.model.pool import data -from iro.model.message import SMS -from iro.model.status import Status -from iro.model.schema import Job, User, Offer as DBOffer, Userright - -from iro.controller.task import Task - -from iro.offer.provider import Provider - -from iro.telnumber import Telnumber -from iro.validate import vInteger - -from .dbtestcase import DBTestCase - -class DummyPool(): - def run(self, f,*a,**k): - return f(*a,**k) - -from twisted.python import log - -class DummyObserver(object): - def __init__(self): - self.e=[] - - def start(self): - log.addObserver(self.emit) - - def stop(self): - log.removeObserver(self.emit) - - def emit(self, eventDict): - self.e.append(eventDict) - -class JobTestCase(DBTestCase): - def setUp(self): - DBTestCase.setUp(self) - self.pool = data.pool - data.pool = DummyPool() - - def tearDown(self): - exJobs.clear() - data.pool = self.pool - self.pool = None - DBTestCase.tearDown(self) - - -class exJobsTest(JobTestCase): - '''tests for exJobs''' - - def testCreate(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - - job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) - self.assertIsInstance(job, ExJob) - self.assertTrue(vInteger(job.dbjob, None, minv=0 )) - self.assertEqual(job.message, SMS('test')) - self.assertEqual(job.recipients, [Telnumber('123456789')]) - self.assertEqual(job.offers,[]) - self.assertEqual(job.tasks,{}) - - with self.session() as session: - j = session.query(Job.id).all() - self.assertEqual(j,[(job.dbjob,)]) - - self.assertEqual(exJobs[job.dbjob],job) - - def testCreate2(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="test", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o)) - - job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) - self.assertEqual(job.offers,['test']) - - def testGet(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - - job = ExJob(None, [Telnumber('123456789')], SMS('test'), ['test']) - exJobs[1] = job - - self.assertEqual(len(exJobs), 1) - self.assertEqual(job, exJobs[1]) - - def testGetFromDB(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - job = Job( info="info", status="started") - u.jobs.append(job) - session.add(u) - - with self.session() as session: - job = session.merge(job) - u = session.merge(u) - ejob= ExJob(job.id, [Telnumber('123456789')], SMS('test'), ['test']) - exJobs[job.id]=ejob - self.assertEqual(job.extend, ejob) - self.assertEqual(u.jobs[0].extend,ejob) - - def testUnknownExJob(self): - self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890') - - -class StatiTest(JobTestCase): - def setUp(self): - JobTestCase.setUp(self) - self.log = DummyObserver() - self.log.start() - - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="test", provider="bla", route="a", typ="sms") - u.rights.append(Userright(o)) - - self.user = u - self.offer = o - - self.provider=Provider("bla", {"sms":["a","b","c"]}) - self.job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), []) - - def tearDown(self): - self.log.stop() - JobTestCase.tearDown(self) - - - def testSetError(self): - self.job.setError(Task(Telnumber('123456789'),self),Exception("muhaha")) - errors = self.flushLoggedErrors(Exception) - self.assertEqual(len(errors), 1) - self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to '0049123456789' failed."%self.job.dbjob) - - with self.session() as session: - u = session.merge(self.user) - job = u.job(self.job.dbjob) - self.assertEqual(job.status,"error") - - def testSetStatus(self): - task = Task(Telnumber('123456789'),self.job) - status = Status(self.provider,"a") - self.job.setStatus(task, status) - - self.assertEqual(self.log.e[0]['message'], ("Job(%s) to '0049123456789' ended sucecessfully via bla:a."%self.job.dbjob,)) - - with self.session() as session: - u = session.merge(self.user) - job = u.job(self.job.dbjob) - self.assertEqual(job.status,"sended") - self.assertEqual(len(job.messages),0) - - def testMultipleRecipients(self): - self.job.recipients.append(Telnumber("01234567890")) - task = Task(Telnumber('123456789'),self.job) - status = Status(self.provider,"a") - self.job.setStatus(task, status) - - with self.session() as session: - u = session.merge(self.user) - job = u.job(self.job.dbjob) - self.assertEqual(job.status,"sending") - - @patch("iro.model.job.datetime") - def testCosts(self,p_dt): - p_dt.today.return_value = datetime(2000, 1, 2, 3, 4, 5) - task = Task(Telnumber('123456789'),self.job) - status = Status(self.provider,"a",costs=0.055,exID="12345678",count=1) - - self.job.setStatus(task, status) - - with self.session() as session: - u = session.merge(self.user) - o = session.merge(self.offer) - job = u.job(self.job.dbjob) - self.assertEqual(job.status,"sended") - self.assertEqual(len(job.messages),1) - - msg = job.messages[0] - self.assertEqual(msg.price,Decimal('0.0550')) - self.assertEqual(msg.isBilled,False) - self.assertEqual(msg.recipient,str(Telnumber('123456789'))) - self.assertEqual(msg.date,datetime(2000, 1, 2, 3, 4, 5)) - self.assertEqual(msg.offer,o) - self.assertEqual(msg.exID,"12345678") - self.assertEqual(msg.count,1) diff -r 448dd8d36839 -r 3929338fd17f tests/model_validate.py --- a/tests/model_validate.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,57 +0,0 @@ -from iro.model.schema import Offer -from iro.model.decorators import vRoute, vTyp -from iro.model.pool import data - -from iro.error import ValidateException - -from .dbtestcase import DBTestCase - -class DummyPool(): - def run(self, f,*a,**k): - return f(*a,**k) - -class ModelVaidatorTest(DBTestCase): - """tests for the model vaidators""" - def setUp(self): - DBTestCase.setUp(self) - self.pool = data.pool - data.pool = DummyPool() - - def tearDown(self): - data.pool = self.pool - self.pool = None - DBTestCase.tearDown(self) - - def testTyp(self): - with self.session() as session: - session.add(Offer(name="t",provider="p",typ="type")) - - with self.session() as session: - self.assertEqual(vTyp("type",None),"type") - e = self.assertRaises(ValidateException,vTyp, "sss", None) - self.assertEqual(str(e),'700:Typ sss is not valid.') - - def testRoute(self): - with self.session() as session: - session.add(Offer(name="t",provider="p",typ="type")) - self.assertEqual(vRoute("t",None,typ="type"),"t") - self.assertEqual(vRoute(["t","t"],None,typ="type"),["t"]) - e = self.assertRaises(ValidateException,vRoute, "s", None, typ="type") - self.assertEqual(str(e),'700:Route s is not valid.') - - def testRouteAllow(self): - with self.session() as session: - session.add(Offer(name="t",provider="p",typ="type")) - e = self.assertRaises(ValidateException,vRoute, "t", "foo", typ="type", allowString=False) - self.assertEqual(str(e),'700:foo must be a list of routes.') - e = self.assertRaises(ValidateException,vRoute, ["t"], "foo", typ="type", allowList=False) - self.assertEqual(str(e),'700:foo must be a route - No list of routes.') - - def testRouteProvider(self): - with self.session() as session: - session.add(Offer(name="t",provider="p",typ="type")) - self.assertEqual(vRoute("p",None,typ="type"),"p") - - def testRouteDefault(self): - self.assertEqual(vRoute("default",None,typ="type"),"default") - diff -r 448dd8d36839 -r 3929338fd17f tests/offer.py --- a/tests/offer.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,156 +0,0 @@ -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks - -import io - -from iro.model import offer -from iro.config import configParser -from iro.offer import Offer, providers as OfferProviders, Provider -from iro.model.schema import User, Offer as DBOffer, Userright -from iro.controller.pool import dbPool -from iro.error import NoProvider -from .dbtestcase import DBTestCase - -class TestOffers(DBTestCase): - '''test config class''' - def setUp(self): - DBTestCase.setUp(self) - dbPool.start(reactor) - - def tearDown(self): - for s in configParser.sections(): - configParser.remove_section(s) - - dbPool.pool.stop() - offer.offers.clear() - - try: - del(OfferProviders["test"]) - except KeyError: - pass - - offer.providers.clear() - DBTestCase.tearDown(self) - - def testReloadList(self): - '''test if loadOffers will be fired by reloading Config''' - self.assertTrue(offer.loadOffers in configParser.reloadList) - - @inlineCallbacks - def testExtendProviderUnknown(self): - '''test the extendProvider Function (route unknown)''' - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - - ret = yield offer.extendProvider(u, "sms", ["blub"]) - self.assertEqual(ret, []) - - @inlineCallbacks - def testExtendProviderKnown(self): - '''test the extendProvider Function (route known)''' - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="blub", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o)) - - ret = yield offer.extendProvider(u, "sms", ["blub"]) - self.assertEqual(ret, ["blub"]) - - ret = yield offer.extendProvider(u, "sms", ["blub", "blub"]) - self.assertEqual(ret, ["blub"]) - - @inlineCallbacks - def testExtendProviderProvider(self): - '''test the extendProvider Function (provider known)''' - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="oh", provider="bla", route="a", typ="sms") - u.rights.append(Userright(o)) - - offer.providers={"bla":Provider("bla", {"sms":["a","b","c"]})} - - for l in [['bla'],['oh'],['oh','bla'],['bla','oh']]: - ret = yield offer.extendProvider(u, "sms", l) - self.assertEqual(ret, ["oh"]) - - @inlineCallbacks - def testExtendProviderDouble(self): - '''test the extendProvider Function (provider and name doubles)''' - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="oh", provider="bla", route="a", typ="sms") - u.rights.append(Userright(o)) - o=DBOffer(name="a", provider="bla", route="b", typ="sms") - u.rights.append(Userright(o)) - - offer.providers={"bla":Provider("bla", {"sms":["a","b","c"]})} - - ret = yield offer.extendProvider(u, "sms", ["bla"]) - self.assertEqual(ret, ["oh","a"]) - - ret = yield offer.extendProvider(u, "sms", ["a","bla"]) - self.assertEqual(ret, ["a","oh"]) - - ret = yield offer.extendProvider(u, "sms", ["bla", "a"]) - self.assertEqual(ret, ["oh","a"]) - - @inlineCallbacks - def testExtendProviderDefault(self): - '''test the extendProvider Function (default)''' - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="blub3", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o,default=2)) - o=DBOffer(name="blub", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o,default=1)) - o=DBOffer(name="blub2", provider="bla2", route="basic", typ="sms") - u.rights.append(Userright(o)) - o=DBOffer(name="blub4", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o,default=3)) - - ret = yield offer.extendProvider(u, "sms", ["default"]) - self.assertEqual(ret, ["blub","blub3",'blub4']) - - ret = yield offer.extendProvider(u, "sms", "default") - self.assertEqual(ret, ["blub","blub3", 'blub4']) - - - @inlineCallbacks - def testLoadOffers(self): - - class TestProvider(Provider): - def __init__(self,name): - Provider.__init__(self,name,{"sms":["a",]}) - - with self.session() as session: - session.add(DBOffer(name="oh", provider="p", route="a", typ="sms")) - - sample_config = """[p] -typ = test -""" - configParser.readfp(io.BytesIO(sample_config)) - OfferProviders["test"]=TestProvider - yield offer.loadOffers() - self.assertEqual(offer.offers.keys(),["oh"]) - self.assertEqual(offer.providers.keys(),["p"]) - self.assertEqual(offer.providers["p"].name,"p") - self.assertEqual(offer.providers["p"].typ,"test") - self.assertEqual(offer.offers["oh"],Offer(provider=offer.providers["p"], name="oh", route="a", typ="sms")) - - def testLoadOffersUnknown(self): - with self.session() as session: - session.add(DBOffer(name="oh", provider="p", route="a", typ="sms")) - - sample_config = """[p] -typ = unknown -""" - configParser.readfp(io.BytesIO(sample_config)) - d = offer.loadOffers() - self.assertFailure(d, NoProvider) - return d - diff -r 448dd8d36839 -r 3929338fd17f tests/offer_integrated.py --- a/tests/offer_integrated.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,215 +0,0 @@ -from twisted.internet.defer import inlineCallbacks - -from datetime import datetime -from decimal import Decimal -from mock import patch, Mock - -from iro.model.job import exJobs -from iro.model.pool import data -from iro.model.schema import User, Offer as DBOffer, Userright -from iro.model.message import SMS, Mail -from iro.model import offer - -from iro.controller.task import Task, taskPool -from iro.telnumber import Telnumber - -from iro.offer import Smstrade, SMTP, Offer -from iro.offer.smstrade import SmstradeException, StatusCode - -from .dbtestcase import DBTestCase - -class DummyPool(): - def run(self, f,*a,**k): - return f(*a,**k) - -def run( f,*a,**k): - return f(*a,**k) - -from twisted.python import log - -class DummyObserver(object): - def __init__(self): - self.e=[] - - def start(self): - log.addObserver(self.emit) - - def stop(self): - log.removeObserver(self.emit) - - def emit(self, eventDict): - self.e.append(eventDict) - - - -class IntegratedOfferTests(DBTestCase): - def setUp(self): - DBTestCase.setUp(self) - self.pool = data.pool - data.pool = DummyPool() - - self.taskPool = taskPool.run - taskPool.run = run - - self.log = DummyObserver() - self.log.start() - - def tearDown(self): - self.log.stop() - exJobs.clear() - data.pool = self.pool - self.pool = None - taskPool.run = self.taskPool - self.taskPool = None - DBTestCase.tearDown(self) - - - @patch("iro.model.job.datetime") - @patch("urllib.urlopen") - @inlineCallbacks - def testSmstrade(self, p_u, p_dt): - f = Mock() - f.readlines.return_value = ["100","12345678","0.055","1"] - p_u.return_value = f - - p_dt.today.return_value = datetime(2000, 1, 2, 3, 4, 5) - - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="s", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o)) - - - offer.providers["bla"] = Smstrade("bla") - offer.providers["bla"].key = "XXXXXX" - offer.offers["s"] = Offer("s",offer.providers["bla"],"basic","sms") - - j = yield exJobs.create(u,[Telnumber("0123456789")],SMS("bla"),['s'],'tesched') - t = Task(Telnumber("0123456789"),j) - yield t.start() - - self.assertEqual(self.log.e[0]['message'], ("Job(%s) to '0049123456789' ended sucecessfully via bla:basic."%j.dbjob,)) - - with self.session() as session: - u = session.merge(u) - o = session.merge(o) - job = u.job(j.dbjob) - self.assertEqual(job.status,"sended") - self.assertEqual(job.info,"tesched") - self.assertEqual(len(job.messages),1) - - msg = job.messages[0] - self.assertEqual(msg.price,Decimal('0.0550')) - self.assertEqual(msg.isBilled,False) - self.assertEqual(msg.recipient,str(Telnumber('123456789'))) - self.assertEqual(msg.date,datetime(2000, 1, 2, 3, 4, 5)) - self.assertEqual(msg.offer,o) - self.assertEqual(msg.exID,"12345678") - self.assertEqual(msg.count,1) - - - @patch("urllib.urlopen") - @inlineCallbacks - def testSmstradeException(self, mock_urlopen): - f = Mock() - f.readlines.return_value = ["703"] - mock_urlopen.return_value = f - - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="s", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o)) - - offer.providers["bla"] = Smstrade("bla") - offer.providers["bla"].key = "XXXXXX" - offer.offers["s"] = Offer("s",offer.providers["bla"],"basic","sms") - - j = yield exJobs.create(u,[Telnumber("0123456789")],SMS("bla"),['s'],'tesched') - t = Task(Telnumber("0123456789"),j) - yield t.start() - - errors = self.flushLoggedErrors(SmstradeException) - self.assertEqual(len(errors), 1) - self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to '0049123456789' failed."%j.dbjob) - - self.assertEqual(t.error, True) - self.assertEqual(str(t.status.value),str(SmstradeException(StatusCode(703)))) - - with self.session() as session: - u = session.merge(u) - o = session.merge(o) - job = u.job(j.dbjob) - self.assertEqual(job.status,"error") - self.assertEqual(len(job.messages),0) - - @patch("smtplib.SMTP") - @inlineCallbacks - def testSmtp(self, p_s ): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="s", provider="bla", route=None, typ="mail") - u.rights.append(Userright(o)) - - offer.providers["bla"] = SMTP("bla") - offer.providers["bla"].SSL = False - offer.providers["bla"].TLS = False - offer.providers["bla"].host = "localhost" - offer.providers["bla"].port = 12345 - offer.providers["bla"].user = "" - offer.providers["bla"].send_from = "frm@test.de" - offer.offers["s"] = Offer("s",offer.providers["bla"],None,"mail") - - j = yield exJobs.create(u,["t@test.de"],Mail("bla",'msg',None),['s'],'tesched') - t = Task("t@test.de",j) - yield t.start() - - self.assertEqual(self.log.e[0]['message'], ("Job(%s) to 't@test.de' ended sucecessfully via bla:None."%j.dbjob,)) - - with self.session() as session: - u = session.merge(u) - o = session.merge(o) - job = u.job(j.dbjob) - self.assertEqual(job.status,"sended") - self.assertEqual(job.info,"tesched") - self.assertEqual(len(job.messages),0) - - @patch("smtplib.SMTP") - @inlineCallbacks - def testSmtpException(self, p_s): - p_s.side_effect = IOError(111,"Connection refused") - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="s", provider="bla", route=None, typ="mail") - u.rights.append(Userright(o)) - - offer.providers["bla"] = SMTP("bla") - offer.providers["bla"].SSL = False - offer.providers["bla"].TLS = False - offer.providers["bla"].host = "localhost" - offer.providers["bla"].port = 12345 - offer.providers["bla"].user = "" - offer.providers["bla"].send_from = "frm@test.de" - offer.offers["s"] = Offer("s",offer.providers["bla"],None,"mail") - - j = yield exJobs.create(u,["t@test.de"],Mail("bla",'msg',None),['s'],'tesched') - t = Task("t@test.de",j) - yield t.start() - - errors = self.flushLoggedErrors(IOError) - self.assertEqual(len(errors), 1) - self.assertEqual(self.log.e[0]['why'], "Error: Job(%s) to 't@test.de' failed."%j.dbjob -) - self.assertEqual(t.error, True) - self.assertEqual(str(t.status.value),str(IOError(111,"Connection refused"))) - - with self.session() as session: - u = session.merge(u) - o = session.merge(o) - job = u.job(j.dbjob) - self.assertEqual(job.status,"error") - self.assertEqual(len(job.messages),0) - diff -r 448dd8d36839 -r 3929338fd17f tests/reload.py --- a/tests/reload.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,14 +0,0 @@ -from .dbtestcase import DBTestCase - - -class TestReload(DBTestCase): - '''tests for reloading feature''' - - def testLog(self): - pass - testLog.todo = "reloading logfile is not implemented" - - def testOffers(self): - pass - testOffers.todo = "reloading offers test not implemented" - diff -r 448dd8d36839 -r 3929338fd17f tests/smstrade.py --- a/tests/smstrade.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,130 +0,0 @@ -from twisted.trial import unittest -from decimal import Decimal -from mock import patch, Mock - -from iro.error import NoRoute, NoTyp, NeededOption, RejectRecipient -from iro.telnumber import Telnumber -from iro.model.message import SMS -from iro.offer.smstrade import Smstrade, SmstradeException, StatusCode - - -HOST = "localhost" -PORT = 9999 - -class TestSMStradeProvider(unittest.TestCase): - - def getProvider(self, c=None): - _c={"key":"XXXXXXXX", - "typ":"smstrade", - } - - if c: - _c.update(c) - - ret = Smstrade("test") - ret.load(_c.items()) - return ret - - @patch("urllib.urlopen") - def testSendSMS(self,mock_urlopen): - f = Mock() - f.readlines.return_value = ["100","12345678","0.055","1"] - mock_urlopen.return_value = f - - params = ["key=XXXXXXXX","to=00491701234567", "message=Hello+World", "route=gold", "message_id=1", "cost=1","count=1",'charset=utf-8'] - params.sort() - - p=self.getProvider() - content = "Hello World" - r = p.send("gold", Telnumber("01701234567"), SMS(content,None)) - - ca = mock_urlopen.call_args[0] - c=ca[1].split("&") - c.sort() - - self.assertEqual(ca[0],"https://gateway.smstrade.de") - self.assertEqual(c,params) - self.assertEqual(f.readlines.call_count,1) - - self.assertEqual(r.provider, p) - self.assertEqual(r.route, 'gold') - self.assertEqual(r.costs, Decimal('0.055')) - self.assertEqual(r.exID, '12345678') - self.assertEqual(r.count, 1) - - def testStatusCode(self): - s = StatusCode(10,"12345678","1.10",1) - self.assertEqual(str(s),'10: Empfaengernummer nicht korrekt.') - self.assertEqual(int(s),10) - self.assertEqual(s.count,1) - self.assertEqual(s.costs,Decimal("1.10")) - self.assertEqual(s.exID,'12345678') - - - def testUnknownStatusCode(self): - s = StatusCode(999) - self.assertEqual(str(s),'999: unknown statuscode.') - self.assertEqual(int(s),999) - self.assertEqual(s.count,0) - self.assertEqual(s.costs,Decimal("0.00")) - self.assertEqual(s.exID, None) - - - - def testRejectRecipient(self): - p=self.getProvider() - content = "Hello World" - e = self.assertRaises(RejectRecipient, p.send, "basic", Telnumber("+331701234567"), SMS(content,None)) - self.assertEqual(str(e),'Reject recipient(00331701234567): None') - - @patch("urllib.urlopen") - def testRejectRecipient70(self,mock_urlopen): - f = Mock() - f.readlines.return_value = ["70"] - mock_urlopen.return_value = f - - p=self.getProvider() - content = "Hello World" - self.assertRaises(RejectRecipient, p.send , "basic", Telnumber("01701234567") ,SMS(content,None)) - - f.readlines.return_value = ["71"] - e = self.assertRaises(RejectRecipient, p.send , "basic", Telnumber("01701234567"), SMS(content,None)) - self.assertEqual(str(e),'Reject recipient(00491701234567): 71: Feature nicht ueber diese Route moeglich.') - - @patch("urllib.urlopen") - def testUnknwonStatuscode(self,mock_urlopen): - f = Mock() - f.readlines.return_value = ["703"] - mock_urlopen.return_value = f - - p=self.getProvider() - content = "Hello World" - e = self.assertRaises(SmstradeException, p.send , "basic", Telnumber("01701234567"), SMS(content,None)) - self.assertEqual(str(e),'950:Error in external API.\n703: unknown statuscode.') - - @patch("urllib.urlopen") - def testSmstradeException(self,mock_urlopen): - f = Mock() - f.readlines.return_value = ["10"] - mock_urlopen.return_value = f - - p=self.getProvider() - content = "Hello World" - e = self.assertRaises(SmstradeException, p.send , "basic", Telnumber("01701234567"), SMS(content,None)) - self.assertEqual(str(e),'950:Error in external API.\n10: Empfaengernummer nicht korrekt.') - - - - def testNeededOption(self): - s= self.getProvider() - self.assertEqual(s.key, "XXXXXXXX") - - self.assertRaises(NeededOption, s.load,[]) - - def testSendFunc(self): - s = self.getProvider() - p = s.getSendFunc("sms","basic") - self.assertEqual(p.func, s.send) - self.assertEqual(p.args, ("basic",)) - self.assertRaises(NoRoute,s.getSendFunc,"sms","foo") - self.assertRaises(NoTyp,s.getSendFunc,"mail2","basic") diff -r 448dd8d36839 -r 3929338fd17f tests/smtp.py --- a/tests/smtp.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,146 +0,0 @@ -from twisted.trial import unittest - -import email -from email.header import decode_header -import base64 -from smtp_helper import TestSMTPServer - -from mock import patch, Mock -import smtplib - -from iro.error import NoRoute, NoTyp, NeededOption -from iro.model.message import Mail -from iro.offer.smtp import SMTP - -HOST = "localhost" -PORT = 9999 - -class TestSMTPProvider(unittest.TestCase): - def setUp(self): - self.smtp_server = TestSMTPServer((HOST, PORT)) - self.smtp_server.start() - - def tearDown(self): - self.smtp_server.close() - - def getSMTP(self, c=None): - _c={"send_from":"send@t.de", - "host":HOST, - "port":PORT, - "typ":"smtp", - } - - if c: - _c.update(c) - - ret = SMTP("test") - ret.load(_c.items()) - return ret - - def testSendMail(self): - p=self.getSMTP() - content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" - p.send("t@t.de", Mail("sub", content, None)) - - - self.assertEqual(len(self.smtp_server.rcvd), 1) - fromaddr, toaddrs, message = self.smtp_server.rcvd[0] - msg = email.message_from_string(message) - - self.assertEqual(fromaddr,"send@t.de") - self.assertEqual(msg.get_all("From"),["send@t.de"]) - self.assertEqual(toaddrs,["t@t.de"]) - self.assertEqual(msg.get_all("To"),["t@t.de"]) - self.assertEqual(decode_header(msg.get("Subject")),[("sub","utf-8")]) - self.assertEqual(base64.b64decode(msg.get_payload()),content) - - def testSendMailExtraFrm(self): - p=self.getSMTP() - content = "" - p.send("t@t.de", Mail("sub", content, "f@t.de")) - - self.assertEqual(len(self.smtp_server.rcvd), 1) - fromaddr, toaddrs, message = self.smtp_server.rcvd[0] - msg = email.message_from_string(message) - - self.assertEqual(fromaddr,"f@t.de") - self.assertEqual(msg.get_all("From"),["f@t.de"]) - - def testSendMailException(self): - p=self.getSMTP({"port":PORT-1}) - content = "" - self.assertRaises(IOError, p.send, "t@t.de", Mail("sub", content, "f@t.de")) - - self.assertEqual(len(self.smtp_server.rcvd), 0) - - @patch("smtplib.SMTP_SSL") - def testSSLSendMail(self,mock_ssl): - def se(*args): - return smtplib.SMTP(*args) - mock_ssl.side_effect=se - - p=self.getSMTP({"SSL":True}) - content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" - p.send("t@t.de", Mail("sub", content, None)) - - self.assertEqual(mock_ssl.call_count,1) - - self.assertEqual(len(self.smtp_server.rcvd), 1) - - @patch("smtplib.SMTP") - def testTLSSendMail(self,mock_smtp): - mock_s = Mock() - mock_smtp.return_value = mock_s - - p=self.getSMTP({"TLS":True}) - content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" - p.send("t@t.de", Mail("sub", content, None)) - - mock_s.starttls.assert_called_once_with() - self.assertEqual(mock_s.sendmail.call_count,1) - self.assertEqual([i[0] for i in mock_s.method_calls],["starttls","sendmail","quit"]) - - @patch("smtplib.SMTP") - def testLoginSendMail(self,mock_smtp): - mock_s = Mock() - mock_smtp.return_value = mock_s - - p=self.getSMTP({"user":"user","password":"pw"}) - content = "sadfadfgwertsdgsdf\n\nsdfgaerasdfsad\nadfasdf" - p.send("t@t.de", Mail("sub", content, None)) - - mock_s.login.assert_called_once_with("user","pw") - self.assertEqual(mock_s.sendmail.call_count,1) - self.assertEqual([i[0] for i in mock_s.method_calls],["login","sendmail","quit"]) - - - def testNeededOption(self): - c={"send_from":"send@t.de", - "host":HOST, - "port":PORT, - "user":"u", - "password":"p", - "typ":"smtp", - } - s = self.getSMTP(c) - self.assertEqual(s.send_from, "send@t.de") - self.assertEqual(s.host, HOST) - self.assertEqual(s.port, PORT) - self.assertEqual(s.user, "u") - self.assertEqual(s.password, "p") - self.assertEqual(s.SSL,False) - self.assertEqual(s.TLS,False) - - c.update({"TLS":True, "SSL":True}) - s = self.getSMTP(c) - self.assertEqual(s.SSL,True) - self.assertEqual(s.TLS,True) - - del c["host"] - self.assertRaises(NeededOption, s.load, c) - - def testSendFunc(self): - s = self.getSMTP() - self.assertEqual(s.getSendFunc("mail",None), s.send) - self.assertRaises(NoRoute,s.getSendFunc,"mail","foo") - self.assertRaises(NoTyp,s.getSendFunc,"mail2","foo") diff -r 448dd8d36839 -r 3929338fd17f tests/task.py --- a/tests/task.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,95 +0,0 @@ -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks - -from Queue import deque - -from iro.model.schema import User, Offer as DBOffer, Userright -from iro.model.message import SMS -from iro.model.status import Status -from iro.model.offer import offers -from iro.model.job import exJobs - -from iro.controller.task import createJob, Task -from iro.controller.pool import taskPool, dbPool - -from iro.offer import Offer, Provider - -from iro.error import NoRouteForTask -from iro.telnumber import Telnumber - -from .dbtestcase import DBTestCase - -class TaskTestCase(DBTestCase): - def setUp(self): - DBTestCase.setUp(self) - dbPool.start(reactor) - - def tearDown(self): - exJobs.clear() - offers.clear() - dbPool.pool.stop() - taskPool.pool.q.queue = deque() - DBTestCase.tearDown(self) - -class TestTasks(TaskTestCase): - - @inlineCallbacks - def testCreateSMS(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - - job = yield createJob(u,[Telnumber('0123325456')],SMS('sms'),[]) - - self.assertEqual(taskPool.pool.q.qsize(),1) - - self.assertEqual(job.tasks.keys(),[Telnumber('0123325456')]) - self.assertIsInstance(job.tasks[Telnumber('0123325456')], Task) - - @inlineCallbacks - def testRun(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - o=DBOffer(name="test", provider="bla", route="basic", typ="sms") - u.rights.append(Userright(o)) - - p=Provider(name="p", typs={"sms":["test",]}) - def send(typ,route,recipient,message): - return Status(provider=p, route=route) - p.send=send - offers["test"] = Offer("test",provider=p, route="test", typ="sms") - - exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) - - task=Task(Telnumber('123456789'), exjob) - ret = yield task._run() - self.assertIsInstance(ret, Status) - self.assertEqual(ret.provider, p) - self.assertEqual(ret.route, "test") - - @inlineCallbacks - def testNoRoute(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(u) - - exjob = yield exJobs.create(u, [Telnumber('123456789')], SMS('test'), []) - - task=Task(Telnumber('123456789'), exjob) - d = task._run() - self.assertFailure(d, NoRouteForTask) - - def testSetStatus(self): - task=Task(Telnumber('123456789'), None) - self.assertEqual(task.status,None) - self.assertEqual(task.error,False) - - self.assertEqual(task.setStatus("fooja"),"fooja") - self.assertEqual(task.status,"fooja") - - def testSetError(self): - task=Task(Telnumber('123456789'), None) - self.assertEqual(task.setError("fooja"),"fooja") - self.assertEqual(task.status,"fooja") - self.assertEqual(task.error,True) diff -r 448dd8d36839 -r 3929338fd17f tests/telnumber.py --- a/tests/telnumber.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,80 +0,0 @@ -import unittest - -from iro.validate import vTel -from iro.error import InvalidTel -from iro.telnumber import Telnumber - -class testTelefonnumbers(unittest.TestCase): - """tests for telefonnumbers""" - - def testMultipleTelnumbers(self): - '''test the telefon validator''' - ret = vTel(["0123/456(78)","+4912346785433","00123435456-658"], None) - x=[Telnumber('+4912345678'),Telnumber('012346785433'),Telnumber('+123435456658')] - self.assertEqual(ret,x) - - def testInvalidTelnumbers(self): - '''invalid telnumbers''' - - numbers=['xa','+1','1-23',';:+0','0123'] - - for number in numbers: - with self.assertRaises(InvalidTel) as e: - vTel([number], None) - self.assertEqual(e.exception.number,number) - - with self.assertRaises(InvalidTel) as e: - vTel(['01234']+numbers, None) - self.assertEqual(e.exception.number,numbers[0]) - - def testDoubles(self): - ret = vTel(["0123/456(78)","+4912345678","004912345678"], None) - x=[Telnumber('+4912345678')] - self.assertEqual(ret,x) - - def equalNumber(self, tel1, tel2): - self.assertEqual(tel1.number, tel2.number) - self.assertEqual(tel1.land, tel2.land) - - def testWrongNumber(self): - telnum=Telnumber() - self.assertRaises(InvalidTel, telnum.createNumber, "hallo") - self.assertRaises(InvalidTel, telnum.createNumber, "0?242") - - def testNumber(self): - telnum=Telnumber("0345-94103640") - telnum2=Telnumber("+49345/94103640") - telnum3=Telnumber("00493459410364-0") - telnum4=Telnumber("+49(0)345-94103640") - - self.assertEqual(telnum.land, "49") - self.assertEqual(telnum.number, "34594103640") - - self.equalNumber(telnum, telnum2) - self.equalNumber(telnum, telnum3) - self.equalNumber(telnum, telnum4) - - def testEqual(self): - telnum=Telnumber("0345-94103640") - telnum2=Telnumber("+49345/94103640") - li=[] - self.assertEqual(telnum == telnum2, True) - self.assertEqual(telnum <> telnum2, False) - self.assertEqual(telnum, telnum2) - self.assertEqual(telnum in li,False) - li.append(telnum) - self.assertEqual(telnum in li,True) - self.assertEqual(telnum2 in li,True) - - def testHash(self): - telnum=Telnumber("0345-94103640") - self.assertEqual(hash(telnum),hash("004934594103640")) - self.assertNotEqual(hash(telnum),hash("004934594103641")) - - def testString(self): - telnum=Telnumber("0345-94103640") - self.assertEqual(str(telnum),"004934594103640") - - def testRepr(self): - telnum=Telnumber("0345-94103640") - self.assertEqual(repr(telnum),"") diff -r 448dd8d36839 -r 3929338fd17f tests/test.pdf Binary file tests/test.pdf has changed diff -r 448dd8d36839 -r 3929338fd17f tests/validate.py --- a/tests/validate.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,135 +0,0 @@ -from twisted.trial import unittest -from mock import Mock - -from iro.validate import vBool, vInteger, vHash, validate -from iro.error import ValidateException - -class testValidators(unittest.TestCase): - '''test for simple validators''' - - def testBool(self): - self.assertEqual(vBool(True,None),True) - self.assertEqual(vBool(1,None),True) - self.assertEqual(vBool("true",None),True) - self.assertEqual(vBool("True",None),True) - self.assertEqual(vBool("TRUE",None),True) - - self.assertEqual(vBool(False,None),False) - self.assertEqual(vBool(0,None),False) - self.assertEqual(vBool("false",None),False) - self.assertEqual(vBool("False",None),False) - self.assertEqual(vBool("FALSE",None),False) - - e = self.assertRaises(ValidateException, vBool, "TRue","test") - self.assertEqual(e.msg,"test is not boolean") - - def testInteger(self): - self.assertEqual(vInteger(123,None),123) - self.assertEqual(vInteger("123",None),123) - self.assertEqual(vInteger("-123",None),-123) - - self.assertRaises(ValidateException, vInteger, "a123",None) - - def testIntegerLimits(self): - self.assertEqual(vInteger(10,None,maxv=10),10) - self.assertRaises(ValidateException, vInteger, 11, None, maxv=10) - - self.assertEqual(vInteger(4,None,minv=4),4) - self.assertRaises(ValidateException, vInteger, 3, None, minv=4) - - self.assertRaises(ValidateException, vInteger, -1, None, minv=0) - self.assertRaises(ValidateException, vInteger, 1, None, maxv=0) - - def testIntegerNoneAllowed(self): - self.assertEqual(vInteger(None,None,none_allowed=True),None) - self.assertEqual(vInteger('',None,none_allowed=True),None) - - self.assertRaises(ValidateException, vInteger, "", None) - self.assertRaises(ValidateException, vInteger, None, None) - - def testHash(self): - self.assertEqual(vHash("0123456789abcdef",None),"0123456789abcdef") - self.assertEqual(vHash("0123456789ABCDEF",None),"0123456789abcdef") - self.assertEqual(vHash("F",None),"f") - self.assertEqual(vHash("",None),'') - - self.assertRaises(ValidateException, vHash, "GHIJKL", None) - - def testHashLimits(self): - self.assertEqual(vHash("F",None,minlength=1),"f") - self.assertRaises(ValidateException, vHash, "", None, minlength=1) - - self.assertEqual(vHash("Fa",None,maxlength=2),"fa") - self.assertRaises(ValidateException, vHash, "123", None, maxlength=1) - - - def testValidate(self): - f = Mock() - f.return_value = "valid" - @validate("t",f,True,1,2,3,4,k=5) - def g(u=False, t="bla"): - return t - d = g(t="uhuhu") - def r(t): - f.called_once_with("uhuhu","t",1,2,3,4,k=5) - self.assertEqual(t,"valid") - d.addCallback(r) - return d - - def testValidateMissingNeed(self): - f = Mock() - @validate("t",f,True,1,2,3,4,k=5) - def g(u, t="buuh"): - return t - e = self.assertRaises(ValidateException, g, u="uhuhu", t = None) - self.assertEqual(str(e),'700:t is nessasary') - - def testValidateMissingNeedNonExplicit(self): - f = Mock() - @validate("t",f,True,1,2,3,4,k=5) - def g(u, **k): - return k["t"] - e = self.assertRaises(ValidateException, g, u="uhuhu") - self.assertEqual(str(e),'700:t is nessasary') - - - def testValidateMissingNeed2(self): - f = Mock() - f.return_value = "valid" - @validate("t",f,True,1,2,3,4,k=5) - def g(u, t="buuh"): - return t - - d = g(True) - - def r(t): - f.called_once_with("buuh","t",1,2,3,4,k=5) - self.assertEqual(t,"valid") - d.addCallback(r) - return d - - def testvalidateNoNeed(self): - f = Mock() - f.return_value = "valid" - @validate("t",f,False,1,2,3,4,k=5) - def g(u, t="buuh"): - return t - d = g("uhu") - def r(t): - self.assertEqual(f.called,True) - self.assertEqual(t,"valid") - d.addCallback(r) - return d - - def testvalidateNoNeed2(self): - f = Mock() - f.return_value = "valid" - @validate("t",f,False,1,2,3,4,k=5) - def g(u, **k): - return k["t"] - d = g("uhu") - def r(t): - self.assertEqual(f.called,False) - self.assertEqual(t,None) - d.addCallback(r) - return d diff -r 448dd8d36839 -r 3929338fd17f tests/viewinterface.py --- a/tests/viewinterface.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,269 +0,0 @@ -from twisted.internet.defer import inlineCallbacks -from datetime import datetime -from Queue import deque - -from iro.model.schema import User, Offer, Userright, Job, Message -from iro.controller.viewinterface import Interface -from iro.controller.pool import taskPool - -from iro.model.message import SMS, Fax, Mail -from iro.model.pool import data -from iro.model.offer import offers -from iro.model.job import exJobs - -import iro.error as IroError - -from .dbtestcase import DBTestCase - -class DummyPool(): - def run(self, f,*a,**k): - return f(*a,**k) - -class ViewInterfaceTest(DBTestCase): - """tests for the xmlrpc interface""" - def setUp(self): - DBTestCase.setUp(self) - self.pool = data.pool - data.pool = DummyPool() - - def tearDown(self): - exJobs.clear() - offers.clear() - taskPool.pool.q.queue = deque() - data.pool = self.pool - self.pool = None - DBTestCase.tearDown(self) - - @inlineCallbacks - def testStatus(self): - ''' test the status function''' - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(User(name='test',apikey='abcdef123456789')) - st = yield Interface().status('abcdef123456789') - self.assertEqual(st, {}) - - with self.session() as session: - u = session.merge(u) - j = Job(info='info', status="started") - j.user=u - session.add(j) - session.commit() - jid=j.id - status = {str(jid):{"status":"started"}} - st = yield Interface().status('abcdef123456789',jid) - self.assertEqual(st, status) - st = yield Interface().status('abcdef123456789') - self.assertEqual(st, status) - st = yield Interface().status('abcdef123456789', '', 'false') - self.assertEqual(st, status) - st = yield Interface().status('abcdef123456789', '', 0) - self.assertEqual(st, status) - - #JobNotFound - d = Interface().status('abcdef123456789',jid+1) - self.assertFailure(d, IroError.JobNotFound) - yield d - - #self.assertEqual(self.__rpc2().status('abcdef123456789','abcde', True), ["",'abcde', True]) - #self.assertEqual(self.__rpc2().status('abcdef123456789', '', 'true'), ["", '', True]) - #self.assertEqual(self.__rpc2().status('abcdef123456789', '', 1), ["", '', True]) - - def testNoSuchUser(self): - '''a unknown user should raise a UserNotNound Exception - bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception''' - d = Interface().status('abcdef123456789') - self.assertFailure(d, IroError.UserNotFound) - return d - - - def testValidationFault(self): - '''a validate Exception should be translated to a xmlrpclib.Fault.''' - d = Interface().status('xxx') - self.assertFailure(d, IroError.ValidateException) - - @inlineCallbacks - def testRoutes(self): - '''test the route function''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - r = yield Interface().routes('abcdef123456789','sms') - self.assertEqual(r, ['sipgate_basic']) - - d = Interface().routes('abcdef123456789','fax') - self.assertFailure(d,IroError.ValidateException) - yield d - - with self.session() as session: - o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") - u = session.query(User).filter_by(name="test").first() - u.rights.append(Userright(o)) - o=Offer(name="faxde", provider="faxde", route="", typ="fax") - session.add(o) - session.commit() - - r = yield Interface().routes('abcdef123456789','sms') - self.assertEqual(r, ['sipgate_basic','sipgate_plus']) - r = yield Interface().routes('abcdef123456789','fax') - self.assertEqual(r, []) - - - with self.session() as session: - u = session.query(User).filter_by(name="test").first() - u.rights.append(Userright(o)) - - r = yield Interface().routes('abcdef123456789','sms') - self.assertEqual(r, ['sipgate_basic','sipgate_plus']) - r = yield Interface().routes('abcdef123456789','fax') - self.assertEqual(r, ['faxde']) - - @inlineCallbacks - def testDefaultRoutes(self): - '''test the defaultRoute function''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o,True)) - o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - r = yield Interface().defaultRoute('abcdef123456789','sms') - self.assertEqual(r, ['sipgate_basic']) - - @inlineCallbacks - def testTelnumbers(self): - '''test the telefon validator''' - r = yield Interface().telnumber(["0123/456(78)","+4912346785433","00123435456-658"]) - self.assertEqual(r, True) - - invalid=['xa','+1','1-23',';:+0','0123'] - - d = Interface().telnumber(['01234']+invalid) - def x(failure): - self.assertEqual(failure.getErrorMessage(),"701:No valid telnumber: '%s'"%invalid[0]) - failure.raiseException() - d.addErrback(x) - self.assertFailure(d, IroError.InvalidTel) - yield d - - @inlineCallbacks - def testVaildEmail(self): - '''test vaild email adresses (got from wikipedia)''' - validmails=["niceandsimple@example.com"] - r = yield Interface().email(validmails) - self.assertEqual(r,True) - - def testInvaildEmail(self): - '''test invaild email adresses (got from wikipedia)''' - invalid=["Abc.example.com",] - d = Interface().email(invalid) - self.assertFailure(d, IroError.InvalidMail) - return d - - @inlineCallbacks - def testBill(self): - '''test bill function''' - apikey='abcdef123456789' - with self.session() as session: - u=User(name='test',apikey=apikey) - session.add(u) - - r = yield Interface().bill(apikey) - self.assertEqual(r,{'total':{'price':0.0,'anz':0}}) - - with self.session() as session: - u = session.merge(u) - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - u.rights.append(Userright(o)) - j = Job(info='i',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) - u.jobs.append(j) - - j = Job(info='a',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) - u.jobs.append(j) - - ret=yield Interface().bill(apikey) - self.assertEqual(ret['total'],{'price':0.8,'anz':2}) - self.assertEqual(ret['sipgate_basic'], - {'price':0.8,'anz':2, - 'info':{'i':{'price':0.4,'anz':1}, - 'a':{'price':0.4,'anz':1}, - } - }) - - @inlineCallbacks - def testSMS(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - u.rights.append(Userright(o)) - session.add(u) - - jobid = yield Interface().sms('abcdef123456789','message',['0123325456'],['sipgate_basic']) - - with self.session() as session: - u = session.merge(u) - job = u.job(jobid) - exJob = job.extend - - self.assertEqual(exJob.message,SMS("message",None)) - self.assertEqual(taskPool.pool.q.qsize(),1) - - - @inlineCallbacks - def testMail(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - o = Offer(name='loc',provider="localhost",route="",typ="mail") - u.rights.append(Userright(o)) - session.add(u) - - jobid = yield Interface().mail('abcdef123456789','sub', "hey body!", ['t@te.de'], "frm@t.de" ,['loc']) - with self.session() as session: - u = session.merge(u) - job = u.job(jobid) - exJob = job.extend - - self.assertEqual(exJob.message,Mail("sub",'hey body!','frm@t.de')) - self.assertEqual(taskPool.pool.q.qsize(),1) - - @inlineCallbacks - def testMailFrmNone(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - o = Offer(name='loc',provider="localhost",route="",typ="mail") - u.rights.append(Userright(o)) - session.add(u) - - jobid = yield Interface().mail('abcdef123456789','sub', "hey body!", ['t@te.de'], None,['loc']) - with self.session() as session: - u = session.merge(u) - job = u.job(jobid) - exJob = job.extend - - self.assertEqual(exJob.message,Mail("sub",'hey body!',None)) - self.assertEqual(taskPool.pool.q.qsize(),1) - - - - @inlineCallbacks - def testFax(self): - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - o = Offer(name='b',provider="sipgate",route="b",typ="fax") - u.rights.append(Userright(o)) - session.add(u) - - jobid = yield Interface().fax('abcdef123456789','subject', 'blublbubblu',['0123325456'],['b']) - - with self.session() as session: - u = session.merge(u) - job = u.job(jobid) - exJob = job.extend - - self.assertEqual(exJob.message,Fax("subject","blublbubblu")) - self.assertEqual(taskPool.pool.q.qsize(),1) diff -r 448dd8d36839 -r 3929338fd17f tests/xmlrpc.py --- a/tests/xmlrpc.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,216 +0,0 @@ -from multiprocessing import Process -import unittest - -from datetime import datetime - -import time - -from xmlrpclib import Server as xServer, ServerProxy, Fault - -from iro.model.schema import User, Offer, Userright, Job, Message - -from iro.main import runReactor - -import iro.error as IroError - -from .dbtestcase import DBTestCase - - -class XMLRPCTest(DBTestCase): - """tests for the xmlrpc interface""" - def setUp(self): - DBTestCase.setUp(self) - self.s = Process(target=startReactor, args=(self.engine,)) - self.s.start() - #the new process needs time to get stated, so this process has to sleep - time.sleep(.2) - - def tearDown(self): - self.__debug().stop() - time.sleep(.2) - self.s.join() - DBTestCase.tearDown(self) - - def __debug(self): - return xServer('http://localhost:7080/debug') - - def __rpc2(self): - return ServerProxy('http://localhost:7080/RPC2') - - def testDebugHello(self): - '''simple test for the connection to xmlrpc server''' - ret=self.__debug().hello() - self.failUnlessEqual(ret,'hello') - - def testListMethods(self): - '''list of all offical Methods, that can be executed''' - ret=self.__rpc2().listMethods() - self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'bill', 'telnumber','email']) - - def testStatus(self): - ''' test the status function''' - with self.session() as session: - u = User(name='test',apikey='abcdef123456789') - session.add(User(name='test',apikey='abcdef123456789')) - self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), {}) - - with self.session() as session: - u = session.merge(u) - j = Job(info='info', status="started") - j.user=u - session.add(j) - session.commit() - jid=j.id - status = {str(jid):{"status":"started"}} - self.failUnlessEqual(self.__rpc2().status('abcdef123456789',jid), status) - self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), status) - self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'false'), status) - self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 0), status) - - #JobNotFound - exc = self.assertRaises(Fault, self.__rpc2().status, 'abcdef123456789',jid+1) - unf = IroError.JobNotFound() - self.failUnlessEqual(exc.faultCode, unf.code) - self.failUnlessEqual(exc.faultString, unf.msg) - - #self.failUnlessEqual(self.__rpc2().status('abcdef123456789','abcde', True), ["",'abcde', True]) - #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'true'), ["", '', True]) - #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 1), ["", '', True]) - - def testNoSuchUser(self): - '''a unknown user should raise a UserNotNound Exception - bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception''' - exc = self.assertRaises(Fault, self.__rpc2().status, 'abcdef123456789') - unf=IroError.UserNotFound() - self.failUnlessEqual(exc.faultCode, unf.code) - self.failUnlessEqual(exc.faultString, unf.msg) - - def testNoSuchMethod(self): - '''a unknown mothod should raise a Exception ''' - exc = self.assertRaises(Fault, self.__rpc2().nosuchmethod) - self.failUnlessEqual(exc.faultCode, 8001) - self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found") - - def testValidationFault(self): - '''a validate Exception should be translated to a xmlrpclib.Fault.''' - exc = self.assertRaises(Fault, self.__rpc2().status,'xxx') - self.failUnlessEqual(exc.faultCode, 700) - self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.") - - def testRoutes(self): - '''test the route function''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic']) - - exc = self.assertRaises(Fault, self.__rpc2().routes,'abcdef123456789','fax') - self.failUnlessEqual(exc.faultCode, 700) - self.failUnlessEqual(exc.faultString, "Typ is not valid.") - - with self.session() as session: - o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") - u = session.query(User).filter_by(name="test").first() - u.rights.append(Userright(o)) - o=Offer(name="faxde", provider="faxde", route="", typ="fax") - session.add(o) - session.commit() - self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus']) - self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),[]) - - with self.session() as session: - u = session.query(User).filter_by(name="test").first() - u.rights.append(Userright(o)) - - self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus']) - self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),['faxde']) - - def testDefaultRoutes(self): - '''test the defaultRoute function''' - with self.session() as session: - u=User(name='test',apikey='abcdef123456789') - o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms") - u.rights.append(Userright(o,True)) - o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms") - u.rights.append(Userright(o)) - session.add(u) - self.failUnlessEqual(self.__rpc2().defaultRoute('abcdef123456789','sms'),['sipgate_basic']) - - def testTelnumbers(self): - '''test the telefon validator''' - self.failUnlessEqual(self.__rpc2().telnumber(["0123/456(78)","+4912346785433","00123435456-658"]),True) - - invalid=['xa','+1','1-23',';:+0','0123'] - - exc = self.assertRaises(Fault, self.__rpc2().telnumber,['01234']+invalid) - self.failUnlessEqual(exc.faultCode, 701) - self.failUnlessEqual(exc.faultString, "No valid telnumber: '%s'" % invalid[0]) - - - def testVaildEmail(self): - '''test vaild email adresses (got from wikipedia)''' - validmails=["niceandsimple@example.com"] - self.failUnlessEqual(self.__rpc2().email(validmails),True) - - def testInvaildEmail(self): - '''test invaild email adresses (got from wikipedia)''' - invalid=["Abc.example.com",] - exc= self.assertRaises(Fault, self.__rpc2().email, invalid) - self.failUnlessEqual(exc.faultCode, 702) - self.failUnlessEqual(exc.faultString, "No valid email: '%s'" % invalid[0]) - - def testBill(self): - '''test bill function''' - apikey='abcdef123456789' - with self.session() as session: - u=User(name='test',apikey=apikey) - session.add(u) - - self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.0,'anz':0}}) - - with self.session() as session: - u = session.merge(u) - o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms") - u.rights.append(Userright(o)) - j = Job(info='i',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o)) - u.jobs.append(j) - - j = Job(info='a',status='sended') - j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o)) - u.jobs.append(j) - - ret=self.__rpc2().bill(apikey) - self.failUnlessEqual(ret['total'],{'price':0.8,'anz':2}) - self.failUnlessEqual(ret['sipgate_basic'], - {'price':0.8,'anz':2, - 'info':{'i':{'price':0.4,'anz':1}, - 'a':{'price':0.4,'anz':1}, - } - }) - - -def startReactor(engine): - """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """ - from twisted.internet import reactor - from twisted.web import xmlrpc, resource - - from iro.view.xmlrpc import appendResource - - class XMLRPCDebug(xmlrpc.XMLRPC): - def xmlrpc_stop(self): - reactor.callLater(0.1,reactor.stop) - return "" - - def xmlrpc_hello(self): - return "hello" - - root = resource.Resource() - root = appendResource(root) - root.putChild('debug', XMLRPCDebug()) - runReactor(reactor, engine, root) - -if __name__ == '__main__': - unittest.main() diff -r 448dd8d36839 -r 3929338fd17f tests/xmlrpc_client.py --- a/tests/xmlrpc_client.py Sun Mar 18 14:05:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,12 +0,0 @@ -from multiprocessing.pool import ThreadPool -import xmlrpclib -import timeit - - -def x(i): - xmlrpclib.ServerProxy('http://192.168.56.101:7080/RPC2').status('abcdef123456789') - -pool=ThreadPool(50) - -print min(timeit.repeat(lambda:pool.map(x,range(51)),number=1,repeat=1)) -