--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/xmlrpc/AuthentificateXMLRPCServer.py Thu Oct 22 10:00:01 2009 +0200
@@ -0,0 +1,76 @@
+# Server code
+import SimpleXMLRPCServer
+import string,base64
+
+class AuthentificateXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
+ def do_POST(self):
+ try:
+ header = self.headers['Authorization']
+ type, user_passwd = header.split()
+ username, password = string.split(base64.decodestring(user_passwd), ':')
+ if self.testUser(username,password):
+ SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
+ else:
+ self.report_error(401)
+ except:
+ self.report_error(401)
+
+ def report_error (self,code):
+ '''
+ Send back an errorcode
+ '''
+ #it is important to read out the complete sended request ,
+ # but throw the data away, because an error should be send back
+ try:
+ max_chunk_size = 10*1024*1024
+ size_remaining = int(self.headers["content-length"])
+ while size_remaining:
+ chunk_size = min(size_remaining, max_chunk_size)
+ size_remaining -= len(self.rfile.read(chunk_size))
+ except:
+ pass
+
+ #now just send the error back
+ special_errors={401:self.report_401,
+ 404:self.report_404}
+ if special_errors.has_key(code):
+ special_errors[code]()
+ else:
+ self.send_response(code)
+ self.end_headers()
+ self.connection.shutdown(1)
+
+ def report_401(self):
+ self.send_response(401)
+ self.send_header("WWW-Authenticate", 'Basic realm="%s"'% self.server.relam)
+ response = 'Unauthorised'
+ self.send_header("Content-type", "text/plain")
+ self.send_header("Content-length", str(len(response)))
+ self.end_headers()
+ self.wfile.write(response)
+ # shut down the connection
+ self.wfile.flush()
+ self.connection.shutdown(1)
+
+ def testUser(self,username,password):
+ """
+ Function for testing authentification
+ """
+ if username=="test" and password=="test":
+ return True
+
+ return False
+
+
+
+def test():
+ server = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", 8000),AuthentificateXMLRPCRequestHandler)
+ server.relam="xmlrpc"
+ server.register_introspection_functions()
+ server.register_function(lambda x: x*x, 'square')
+ server.serve_forever()
+
+if __name__ == '__main__':
+ test()
+
+