# Server code
import SimpleXMLRPCServer
import string,base64
class AuthentificateXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
def do_POST(self):
try:
header = self.headers['Authorization']
type, user_passwd = header.split()
username, password = string.split(base64.decodestring(user_passwd), ':')
if self.testUser(username,password):
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
else:
self.report_error(401)
except:
self.report_error(401)
def report_error (self,code):
'''
Send back an errorcode
'''
#it is important to read out the complete sended request ,
# but throw the data away, because an error should be send back
try:
max_chunk_size = 10*1024*1024
size_remaining = int(self.headers["content-length"])
while size_remaining:
chunk_size = min(size_remaining, max_chunk_size)
size_remaining -= len(self.rfile.read(chunk_size))
except:
pass
#now just send the error back
special_errors={401:self.report_401,
404:self.report_404}
if special_errors.has_key(code):
special_errors[code]()
else:
self.send_response(code)
self.end_headers()
self.connection.shutdown(1)
def report_401(self):
self.send_response(401)
self.send_header("WWW-Authenticate", 'Basic realm="%s"'% self.server.relam)
response = 'Unauthorised'
self.send_header("Content-type", "text/plain")
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
# shut down the connection
self.wfile.flush()
self.connection.shutdown(1)
def testUser(self,username,password):
"""
Function for testing authentification
"""
if username=="test" and password=="test":
return True
return False
def test():
server = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", 8000),AuthentificateXMLRPCRequestHandler)
server.relam="xmlrpc"
server.register_introspection_functions()
server.register_function(lambda x: x*x, 'square')
server.serve_forever()
if __name__ == '__main__':
test()