diff options
author | Prabhu Ramachandran | 2011-11-24 19:09:42 +0530 |
---|---|---|
committer | Prabhu Ramachandran | 2011-11-24 19:09:42 +0530 |
commit | 2fd98c77cb06d078ad82f60abfe769cd7666e229 (patch) | |
tree | 33ce2cadc7c2e0610cb490df449d3a850c1ece28 | |
parent | a1b550e09475da1cdf345429df7708a7ae82d38f (diff) | |
download | online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.tar.gz online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.tar.bz2 online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.zip |
ENH: Creating a ServerPool for code checks
Changed the server so we use a pool of servers managed with a Queue of
available servers. The XML/RPC client is also changed to handle
connection failures gracefully. This is because XML/RPC cannot have
more than 2 connections at a given time, so if there is an error, we
wait for a random amount of time and try again. This allows us to
handle fairly large loads nicely.
-rwxr-xr-x | code_server.py | 299 | ||||
-rw-r--r-- | exam/xmlrpc_clients.py | 54 | ||||
-rw-r--r-- | settings.py | 7 |
3 files changed, 225 insertions, 135 deletions
diff --git a/code_server.py b/code_server.py index b9a5fdf..82a0f49 100755 --- a/code_server.py +++ b/code_server.py @@ -13,7 +13,10 @@ or:: $ sudo ./code_server.py 8001 8002 8003 8004 8005 # Runs 5 servers on ports specified. -All these servers should be running as nobody. +All these servers should be running as nobody. This will also start a server +pool that defaults to port 50000 and is configurable in +settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function +that returns an available server. """ import sys import traceback @@ -23,11 +26,11 @@ import os import stat from os.path import isdir, dirname, abspath, join import signal -from multiprocessing import Process +from multiprocessing import Process, Queue import subprocess # Local imports. -from settings import SERVER_PORTS, SERVER_TIMEOUT +from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT MY_DIR = abspath(dirname(__file__)) @@ -38,8 +41,6 @@ def run_as_nobody(): os.setegid(nobody.pw_gid) os.seteuid(nobody.pw_uid) -################################################################################ -# Python related code. # Raised when the code times-out. # c.f. http://pguides.net/python/timeout-a-function @@ -50,120 +51,191 @@ def timeout_handler(signum, frame): """A handler for the ALARM signal.""" raise TimeoutException('Code took too long to run.') -def run_python_code(answer, test_code, in_dir=None): - """Tests given Python function (`answer`) with the `test_code` supplied. - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original when - done). This function also timesout when the function takes more than - SERVER_TIMEOUT seconds to run to prevent runaway code. - Returns - ------- - - A tuple: (success, error message). - +################################################################################ +# `CodeServer` class. +################################################################################ +class CodeServer(object): + """A code server that executes user submitted test code, tests it and + reports if the code was correct or not. """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) + def __init__(self, port, queue): + self.port = port + self.queue = queue + + def run_python_code(self, answer, test_code, in_dir=None): + """Tests given Python function (`answer`) with the `test_code` supplied. + If the optional `in_dir` keyword argument is supplied it changes the + directory to that directory (it does not change it back to the original when + done). This function also timesout when the function takes more than + SERVER_TIMEOUT seconds to run to prevent runaway code. + + Returns + ------- - # Add a new signal handler for the execution of this code. - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - - success = False - tb = None - try: - submitted = compile(answer, '<string>', mode='exec') - g = {} - exec submitted in g - _tests = compile(test_code, '<string>', mode='exec') - exec _tests in g - except TimeoutException: - err = 'Code took more than %s seconds to run.'%SERVER_TIMEOUT - except AssertionError: - type, value, tb = sys.exc_info() - info = traceback.extract_tb(tb) - fname, lineno, func, text = info[-1] - text = str(test_code).splitlines()[lineno-1] - err = "{0} {1} in: {2}".format(type.__name__, str(value), text) - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - else: - success = True - err = 'Correct answer' - finally: - del tb - # Set back any original signal handler. - signal.signal(signal.SIGALRM, old_handler) + A tuple: (success, error message). - # Cancel the signal if any, see signal.alarm documentation. - signal.alarm(0) - - return success, err - -################################################################################ -# Run code for Bash. -def run_bash_code(answer, test_code, in_dir=None): - - """Tests given Bash code (`answer`) with the `test_code` supplied. It - assumes that there are two parts to the test_code separated by '#++++++'. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original when - done). - - Returns - ------- - - A tuple: (success, error message). - - """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) - - def _set_exec(fname): - os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR - |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP - |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH) - - # XXX: fix this to not hardcode it to 6 +'s! - reference, args = test_code.split('#++++++') - ref_f = open('reference.sh', 'w') - ref_f.write(reference); ref_f.close() - ref_fname = abspath(ref_f.name) - _set_exec(ref_fname) - args_f = open('reference.args', 'w') - args_f.write(args); args_f.close() - _set_exec(args_f.name) - submit_f = open('submit.sh', 'w') - submit_f.write(answer); submit_f.close() - submit_fname = abspath(submit_f.name) - _set_exec(submit_fname) - - tester = join(MY_DIR, 'shell_script_tester.sh') - - # Run the shell code in a subprocess. - try: - output = subprocess.check_output([tester, ref_fname, submit_fname], - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError, exc: + """ + if in_dir is not None and isdir(in_dir): + os.chdir(in_dir) + + # Add a new signal handler for the execution of this code. + old_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(SERVER_TIMEOUT) + success = False - err = 'Error: exist status: %d, message: %s'%(exc.returncode, - exc.output) - else: - success = True - err = 'Correct answer' + tb = None + try: + submitted = compile(answer, '<string>', mode='exec') + g = {} + exec submitted in g + _tests = compile(test_code, '<string>', mode='exec') + exec _tests in g + except TimeoutException: + err = 'Code took more than %s seconds to run.'%SERVER_TIMEOUT + except AssertionError: + type, value, tb = sys.exc_info() + info = traceback.extract_tb(tb) + fname, lineno, func, text = info[-1] + text = str(test_code).splitlines()[lineno-1] + err = "{0} {1} in: {2}".format(type.__name__, str(value), text) + except: + type, value = sys.exc_info()[:2] + err = "Error: {0}".format(repr(value)) + else: + success = True + err = 'Correct answer' + finally: + del tb + # Set back any original signal handler. + signal.signal(signal.SIGALRM, old_handler) + + # Cancel the signal if any, see signal.alarm documentation. + signal.alarm(0) + + # Put us back into the server pool queue since we are free now. + self.queue.put(self.port) + + return success, err + + def run_bash_code(self, answer, test_code, in_dir=None): + + """Tests given Bash code (`answer`) with the `test_code` supplied. It + assumes that there are two parts to the test_code separated by '#++++++'. + + If the optional `in_dir` keyword argument is supplied it changes the + directory to that directory (it does not change it back to the original when + done). + + Returns + ------- + + A tuple: (success, error message). + + """ + if in_dir is not None and isdir(in_dir): + os.chdir(in_dir) + + def _set_exec(fname): + os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR + |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP + |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH) + + # XXX: fix this to not hardcode it to 6 +'s! + reference, args = test_code.split('#++++++') + ref_f = open('reference.sh', 'w') + ref_f.write(reference); ref_f.close() + ref_fname = abspath(ref_f.name) + _set_exec(ref_fname) + args_f = open('reference.args', 'w') + args_f.write(args); args_f.close() + _set_exec(args_f.name) + submit_f = open('submit.sh', 'w') + submit_f.write(answer); submit_f.close() + submit_fname = abspath(submit_f.name) + _set_exec(submit_fname) + + tester = join(MY_DIR, 'shell_script_tester.sh') + + # Run the shell code in a subprocess. + try: + output = subprocess.check_output([tester, ref_fname, submit_fname], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError, exc: + success = False + err = 'Error: exist status: %d, message: %s'%(exc.returncode, + exc.output) + else: + success = True + err = 'Correct answer' + + # Put us back into the server pool queue since we are free now. + self.queue.put(self.port) + return success, err + + def run(self): + server = SimpleXMLRPCServer(("localhost", self.port)) + self.server = server + server.register_instance(self) + self.queue.put(self.port) + server.serve_forever() - return success, err +################################################################################ +# `ServerPool` class. +################################################################################ +class ServerPool(object): + """Manages a pool of CodeServer objects.""" + def __init__(self, ports, pool_port=50000): + """Create a pool of servers. Uses a shared Queue to get available + servers. + + Parameters + ---------- + + ports : list(int) + List of ports at which the CodeServer's should run. + + pool_port : int + Port at which the server pool should serve. + """ + self.my_port = pool_port + self.ports = ports + queue = Queue(maxsize=len(ports)) + self.queue = queue + servers = [] + for port in ports: + server = CodeServer(port, queue) + servers.append(server) + p = Process(target=server.run) + p.start() + self.servers = servers + + def get_server_port(self): + """Get available server port from ones in the pool. This will block + till it gets an available server. + """ + q = self.queue + was_waiting = True if q.empty() else False + port = q.get() + if was_waiting: + print '*'*80 + print "No available servers, was waiting but got server later at %d."%port + print '*'*80 + sys.stdout.flush() + return port + + def run(self): + """Run server which returns an available server port where code + can be executed. + """ + server = SimpleXMLRPCServer(("localhost", self.my_port)) + self.server = server + server.register_instance(self) + server.serve_forever() -def run_server(port): - server = SimpleXMLRPCServer(("localhost", port)) - server.register_function(run_python_code) - server.register_function(run_bash_code) - server.serve_forever() +################################################################################ def main(): run_as_nobody() if len(sys.argv) == 1: @@ -171,9 +243,8 @@ def main(): else: ports = [int(x) for x in sys.argv[1:]] - for port in ports: - p = Process(target=run_server, args=(port,)) - p.start() - + server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) + server_pool.run() + if __name__ == '__main__': main() diff --git a/exam/xmlrpc_clients.py b/exam/xmlrpc_clients.py index 01172d7..817e37d 100644 --- a/exam/xmlrpc_clients.py +++ b/exam/xmlrpc_clients.py @@ -1,17 +1,24 @@ from xmlrpclib import ServerProxy -from settings import SERVER_PORTS +import time import random import socket +from settings import SERVER_PORTS, SERVER_POOL_PORT -class CodeServer(object): + +class ConnectionError(Exception): + pass + +################################################################################ +# `CodeServerProxy` class. +################################################################################ +class CodeServerProxy(object): """A class that manages accesing the farm of Python servers and making calls to them such that no one XMLRPC server is overloaded. """ def __init__(self): - servers = [ServerProxy('http://localhost:%d'%(x)) for x in SERVER_PORTS] - self.servers = servers - self.indices = range(len(SERVER_PORTS)) + pool_url = 'http://localhost:%d'%(SERVER_POOL_PORT) + self.pool_server = ServerProxy(pool_url) self.methods = {"python": 'run_python_code', "bash": 'run_bash_code'} @@ -38,27 +45,34 @@ class CodeServer(object): A tuple: (success, error message). """ method_name = self.methods[language] - done = False - result = [False, 'Unable to connect to any code servers!'] - # Try to connect a few times if not, quit. - count = 5 - while (not done) and (count > 0): + try: + server = self._get_server() + method = getattr(server, method_name) + result = method(answer, test_code, user_dir) + except ConnectionError: + result = [False, 'Unable to connect to any code servers!'] + return result + + def _get_server(self): + # Get a suitable server from our pool of servers. This may block. We + # try about 60 times, essentially waiting at most for about 30 seconds. + done, count = False, 60 + + while not done and count > 0: try: - server = self._get_server() - method = getattr(server, method_name) - result = method(answer, test_code, user_dir) + port = self.pool_server.get_server_port() except socket.error: + # Wait a while try again. + time.sleep(random.random()) count -= 1 else: done = True - return result - - def _get_server(self): - # pick a suitable server at random from our pool of servers. - index = random.choice(self.indices) - return self.servers[index] + if not done: + raise ConnectionError("Couldn't connect to a server!") + proxy = ServerProxy('http://localhost:%d'%port) + return proxy # views.py calls this Python server which forwards the request to one # of the running servers. -code_server = CodeServer() +code_server = CodeServerProxy() diff --git a/settings.py b/settings.py index ebfec30..c007867 100644 --- a/settings.py +++ b/settings.py @@ -5,10 +5,15 @@ from os.path import dirname, join, basename, abspath DEBUG = True TEMPLATE_DEBUG = DEBUG -# The ports the Python server should run on. This will run one separate +# The ports the code server should run on. This will run one separate # server for each port listed in the following list. SERVER_PORTS = [8001] # range(8001, 8026) +# The server pool port. This is the server which returns available server +# ports so as to minimize load. This is some random number where no other +# service is running. It should be > 1024 and less < 65535 though. +SERVER_POOL_PORT = 53579 + # Timeout for the code to run in seconds. This is an integer! SERVER_TIMEOUT = 2 |