--- a/iro/model/offer.py Mon Feb 13 17:59:11 2012 +0100
+++ b/iro/model/offer.py Mon Feb 13 18:00:27 2012 +0100
@@ -1,24 +1,18 @@
from .dbdefer import dbdefer
+from .schema import Offer as DBOffer
from ..config import config
-def getPossibleOffers(recipient, os):
- ret = []
- for offer in os:
- ret.append(offers[offer])
- return ret
-
@dbdefer
def extendProvider(session, user, typ, providers):
user = session.merge(user)
ret = []
for p in providers:
- if p in offers.keys() and p not in ret and user.has_right(typ, offer_name = p):
+ if p not in ret and user.has_right(typ, offer_name = p):
ret.append(p)
- elif user.has_right(typ,provider=p):
- rs = [route for route in offers[p].routes if route not in ret]
- for r in rs:
+ elif user.providers(typ).filter(DBOffer.provider==p).first():
+ for r in offers[p].routes:
n = user.has_right(typ, provider=p, route=r)
- if n:
+ if n and n not in ret:
ret.append(n)
return ret
@@ -26,6 +20,7 @@
pass
offers={}
+providers={}
config.registerReload(loadOffers)
--- a/iro/model/schema.py Mon Feb 13 17:59:11 2012 +0100
+++ b/iro/model/schema.py Mon Feb 13 18:00:27 2012 +0100
@@ -119,7 +119,7 @@
def has_right(self, typ, offer_name = None, provider = None, route = None):
'''returns offer_name, if the user is allowed to use offer otherwise None
- !if there are more than one possible solution it only returns the first!'''
+ ->raise sqlalchemy.orm.exc.MultipleResultsFound if not a single offer match'''
filters=[User.name == self.name,
Offer.typ == typ,
]
--- a/tests/db.py Mon Feb 13 17:59:11 2012 +0100
+++ b/tests/db.py Mon Feb 13 18:00:27 2012 +0100
@@ -1,5 +1,5 @@
import unittest
-
+from sqlalchemy.orm.exc import MultipleResultsFound
from datetime import datetime
from iro.model.schema import User, Offer, Userright, Job, Message
@@ -22,6 +22,17 @@
u=session.merge(u)
self.failUnlessEqual(u.routes('sms').all(),[('sipgate_basic',),])
+ def testProviders(self):
+ '''test providers'''
+ with self.session() as session:
+ u=User(name='test',apikey='abcdef123456789')
+ o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
+ u.rights.append(Userright(o))
+ session.add(u)
+
+ with self.session() as session:
+ u=session.merge(u)
+ self.failUnlessEqual(u.providers('sms').all(),[('sipgate',),])
def testTyps(self):
with self.session() as session:
@@ -39,6 +50,58 @@
with self.session() as session:
self.failUnlessEqual(session.typs.order_by(Offer.typ).all(),[('sms',),('sms2',)])
+class Has_RightTests(DBTestCase):
+ '''test User.has_right'''
+ def testSimple(self):
+ '''test a very simple case'''
+ with self.session() as session:
+ u=User(name='test',apikey='abcdef123456789')
+ o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
+ u.rights.append(Userright(o))
+ session.add(u)
+
+ with self.session() as session:
+ u=session.merge(u)
+ self.failUnlessEqual(u.has_right("sms"),"sipgate_basic")
+ self.failUnlessEqual(u.has_right("sms", offer_name="sipgate_basic"),"sipgate_basic")
+ self.failUnlessEqual(u.has_right("sms", route="basic"),"sipgate_basic")
+ self.failUnlessEqual(u.has_right("sms", provider="sipgate"),"sipgate_basic")
+
+ def testDouble(self):
+ '''now we have different routes'''
+ with self.session() as session:
+ u=User(name='test',apikey='abcdef123456789')
+ o=Offer(name="b", provider="sipgate", route="basic", typ="sms")
+ u.rights.append(Userright(o))
+ o=Offer(name="c", provider="sipgate", route="c", typ="sms")
+ u.rights.append(Userright(o))
+ session.add(u)
+
+ with self.session() as session:
+ u=session.merge(u)
+ self.failUnlessEqual(u.has_right("sms", offer_name="b"),"b")
+ self.failUnlessEqual(u.has_right("sms", route="basic"),"b")
+ self.failUnlessEqual(u.has_right("sms", offer_name="c"),"c")
+ self.failUnlessEqual(u.has_right("sms", route="c"),"c")
+ self.assertRaises(MultipleResultsFound, u.has_right,"sms")
+ self.assertRaises(MultipleResultsFound, u.has_right,"sms", provider="sipgate")
+
+ def testUnknown(self):
+ ''' a unknown typ'''
+ with self.session() as session:
+ u=User(name='test',apikey='abcdef123456789')
+ o=Offer(name="b", provider="sipgate", route="basic", typ="sms")
+ u.rights.append(Userright(o))
+ o=Offer(name="c", provider="sipgate", route="c", typ="sms")
+ u.rights.append(Userright(o))
+ session.add(u)
+
+ with self.session() as session:
+ u=session.merge(u)
+ self.failUnlessEqual(u.has_right("fax", offer_name="b"), None)
+ self.failUnlessEqual(u.has_right("fax"), None)
+
+
class BillTest(DBTestCase):
"""test the bill function"""
--- a/tests/offer.py Mon Feb 13 17:59:11 2012 +0100
+++ b/tests/offer.py Mon Feb 13 18:00:27 2012 +0100
@@ -48,6 +48,9 @@
ret = yield offer.extendProvider(u, "sms", ["blub"])
self.assertEqual(ret, ["blub"])
+ ret = yield offer.extendProvider(u, "sms", ["blub", "blub"])
+ self.assertEqual(ret, ["blub"])
+
@inlineCallbacks
def testExtendProviderProvider(self):
'''test the extendProvider Function (provider known)'''
@@ -58,5 +61,30 @@
u.rights.append(Userright(o))
offer.offers={"bla":Offer("bla",["a","b","c"])}
- ret = yield offer.extendProvider(u, "sms", ["bla"])
- self.assertEqual(ret, ["oh"])
+
+ for l in [['bla'],['oh'],['oh','bla'],['bla','oh']]:
+ ret = yield offer.extendProvider(u, "sms", l)
+ self.assertEqual(ret, ["oh"])
+
+ @inlineCallbacks
+ def testExtendProviderDouble(self):
+ '''test the extendProvider Function (provider and name doubles)'''
+ with self.session() as session:
+ u = User(name='test',apikey='abcdef123456789')
+ session.add(u)
+ o=DBOffer(name="oh", provider="bla", route="a", typ="sms")
+ u.rights.append(Userright(o))
+ o=DBOffer(name="a", provider="bla", route="b", typ="sms")
+ u.rights.append(Userright(o))
+
+ offer.offers={"bla":Offer("bla",["a","b","c"])}
+
+ ret = yield offer.extendProvider(u, "sms", ["bla"])
+ self.assertEqual(ret, ["oh","a"])
+
+ ret = yield offer.extendProvider(u, "sms", ["a","bla"])
+ self.assertEqual(ret, ["a","oh"])
+
+ ret = yield offer.extendProvider(u, "sms", ["bla", "a"])
+ self.assertEqual(ret, ["oh","a"])
+