diff options
author | Prabhu Ramachandran | 2011-11-24 19:09:42 +0530 |
---|---|---|
committer | Prabhu Ramachandran | 2011-11-24 19:09:42 +0530 |
commit | 2fd98c77cb06d078ad82f60abfe769cd7666e229 (patch) | |
tree | 33ce2cadc7c2e0610cb490df449d3a850c1ece28 /code_server.py | |
parent | a1b550e09475da1cdf345429df7708a7ae82d38f (diff) | |
download | online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.tar.gz online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.tar.bz2 online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.zip |
ENH: Creating a ServerPool for code checks
Changed the server so we use a pool of servers managed with a Queue of
available servers. The XML/RPC client is also changed to handle
connection failures gracefully. This is because XML/RPC cannot have
more than 2 connections at a given time, so if there is an error, we
wait for a random amount of time and try again. This allows us to
handle fairly large loads nicely.
Diffstat (limited to 'code_server.py')
-rwxr-xr-x | code_server.py | 299 |
1 files changed, 185 insertions, 114 deletions
diff --git a/code_server.py b/code_server.py index b9a5fdf..82a0f49 100755 --- a/code_server.py +++ b/code_server.py @@ -13,7 +13,10 @@ or:: $ sudo ./code_server.py 8001 8002 8003 8004 8005 # Runs 5 servers on ports specified. -All these servers should be running as nobody. +All these servers should be running as nobody. This will also start a server +pool that defaults to port 50000 and is configurable in +settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function +that returns an available server. """ import sys import traceback @@ -23,11 +26,11 @@ import os import stat from os.path import isdir, dirname, abspath, join import signal -from multiprocessing import Process +from multiprocessing import Process, Queue import subprocess # Local imports. -from settings import SERVER_PORTS, SERVER_TIMEOUT +from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT MY_DIR = abspath(dirname(__file__)) @@ -38,8 +41,6 @@ def run_as_nobody(): os.setegid(nobody.pw_gid) os.seteuid(nobody.pw_uid) -################################################################################ -# Python related code. # Raised when the code times-out. # c.f. http://pguides.net/python/timeout-a-function @@ -50,120 +51,191 @@ def timeout_handler(signum, frame): """A handler for the ALARM signal.""" raise TimeoutException('Code took too long to run.') -def run_python_code(answer, test_code, in_dir=None): - """Tests given Python function (`answer`) with the `test_code` supplied. - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original when - done). This function also timesout when the function takes more than - SERVER_TIMEOUT seconds to run to prevent runaway code. - Returns - ------- - - A tuple: (success, error message). - +################################################################################ +# `CodeServer` class. +################################################################################ +class CodeServer(object): + """A code server that executes user submitted test code, tests it and + reports if the code was correct or not. """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) + def __init__(self, port, queue): + self.port = port + self.queue = queue + + def run_python_code(self, answer, test_code, in_dir=None): + """Tests given Python function (`answer`) with the `test_code` supplied. + If the optional `in_dir` keyword argument is supplied it changes the + directory to that directory (it does not change it back to the original when + done). This function also timesout when the function takes more than + SERVER_TIMEOUT seconds to run to prevent runaway code. + + Returns + ------- - # Add a new signal handler for the execution of this code. - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - - success = False - tb = None - try: - submitted = compile(answer, '<string>', mode='exec') - g = {} - exec submitted in g - _tests = compile(test_code, '<string>', mode='exec') - exec _tests in g - except TimeoutException: - err = 'Code took more than %s seconds to run.'%SERVER_TIMEOUT - except AssertionError: - type, value, tb = sys.exc_info() - info = traceback.extract_tb(tb) - fname, lineno, func, text = info[-1] - text = str(test_code).splitlines()[lineno-1] - err = "{0} {1} in: {2}".format(type.__name__, str(value), text) - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - else: - success = True - err = 'Correct answer' - finally: - del tb - # Set back any original signal handler. - signal.signal(signal.SIGALRM, old_handler) + A tuple: (success, error message). - # Cancel the signal if any, see signal.alarm documentation. - signal.alarm(0) - - return success, err - -################################################################################ -# Run code for Bash. -def run_bash_code(answer, test_code, in_dir=None): - - """Tests given Bash code (`answer`) with the `test_code` supplied. It - assumes that there are two parts to the test_code separated by '#++++++'. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original when - done). - - Returns - ------- - - A tuple: (success, error message). - - """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) - - def _set_exec(fname): - os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR - |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP - |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH) - - # XXX: fix this to not hardcode it to 6 +'s! - reference, args = test_code.split('#++++++') - ref_f = open('reference.sh', 'w') - ref_f.write(reference); ref_f.close() - ref_fname = abspath(ref_f.name) - _set_exec(ref_fname) - args_f = open('reference.args', 'w') - args_f.write(args); args_f.close() - _set_exec(args_f.name) - submit_f = open('submit.sh', 'w') - submit_f.write(answer); submit_f.close() - submit_fname = abspath(submit_f.name) - _set_exec(submit_fname) - - tester = join(MY_DIR, 'shell_script_tester.sh') - - # Run the shell code in a subprocess. - try: - output = subprocess.check_output([tester, ref_fname, submit_fname], - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError, exc: + """ + if in_dir is not None and isdir(in_dir): + os.chdir(in_dir) + + # Add a new signal handler for the execution of this code. + old_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(SERVER_TIMEOUT) + success = False - err = 'Error: exist status: %d, message: %s'%(exc.returncode, - exc.output) - else: - success = True - err = 'Correct answer' + tb = None + try: + submitted = compile(answer, '<string>', mode='exec') + g = {} + exec submitted in g + _tests = compile(test_code, '<string>', mode='exec') + exec _tests in g + except TimeoutException: + err = 'Code took more than %s seconds to run.'%SERVER_TIMEOUT + except AssertionError: + type, value, tb = sys.exc_info() + info = traceback.extract_tb(tb) + fname, lineno, func, text = info[-1] + text = str(test_code).splitlines()[lineno-1] + err = "{0} {1} in: {2}".format(type.__name__, str(value), text) + except: + type, value = sys.exc_info()[:2] + err = "Error: {0}".format(repr(value)) + else: + success = True + err = 'Correct answer' + finally: + del tb + # Set back any original signal handler. + signal.signal(signal.SIGALRM, old_handler) + + # Cancel the signal if any, see signal.alarm documentation. + signal.alarm(0) + + # Put us back into the server pool queue since we are free now. + self.queue.put(self.port) + + return success, err + + def run_bash_code(self, answer, test_code, in_dir=None): + + """Tests given Bash code (`answer`) with the `test_code` supplied. It + assumes that there are two parts to the test_code separated by '#++++++'. + + If the optional `in_dir` keyword argument is supplied it changes the + directory to that directory (it does not change it back to the original when + done). + + Returns + ------- + + A tuple: (success, error message). + + """ + if in_dir is not None and isdir(in_dir): + os.chdir(in_dir) + + def _set_exec(fname): + os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR + |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP + |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH) + + # XXX: fix this to not hardcode it to 6 +'s! + reference, args = test_code.split('#++++++') + ref_f = open('reference.sh', 'w') + ref_f.write(reference); ref_f.close() + ref_fname = abspath(ref_f.name) + _set_exec(ref_fname) + args_f = open('reference.args', 'w') + args_f.write(args); args_f.close() + _set_exec(args_f.name) + submit_f = open('submit.sh', 'w') + submit_f.write(answer); submit_f.close() + submit_fname = abspath(submit_f.name) + _set_exec(submit_fname) + + tester = join(MY_DIR, 'shell_script_tester.sh') + + # Run the shell code in a subprocess. + try: + output = subprocess.check_output([tester, ref_fname, submit_fname], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError, exc: + success = False + err = 'Error: exist status: %d, message: %s'%(exc.returncode, + exc.output) + else: + success = True + err = 'Correct answer' + + # Put us back into the server pool queue since we are free now. + self.queue.put(self.port) + return success, err + + def run(self): + server = SimpleXMLRPCServer(("localhost", self.port)) + self.server = server + server.register_instance(self) + self.queue.put(self.port) + server.serve_forever() - return success, err +################################################################################ +# `ServerPool` class. +################################################################################ +class ServerPool(object): + """Manages a pool of CodeServer objects.""" + def __init__(self, ports, pool_port=50000): + """Create a pool of servers. Uses a shared Queue to get available + servers. + + Parameters + ---------- + + ports : list(int) + List of ports at which the CodeServer's should run. + + pool_port : int + Port at which the server pool should serve. + """ + self.my_port = pool_port + self.ports = ports + queue = Queue(maxsize=len(ports)) + self.queue = queue + servers = [] + for port in ports: + server = CodeServer(port, queue) + servers.append(server) + p = Process(target=server.run) + p.start() + self.servers = servers + + def get_server_port(self): + """Get available server port from ones in the pool. This will block + till it gets an available server. + """ + q = self.queue + was_waiting = True if q.empty() else False + port = q.get() + if was_waiting: + print '*'*80 + print "No available servers, was waiting but got server later at %d."%port + print '*'*80 + sys.stdout.flush() + return port + + def run(self): + """Run server which returns an available server port where code + can be executed. + """ + server = SimpleXMLRPCServer(("localhost", self.my_port)) + self.server = server + server.register_instance(self) + server.serve_forever() -def run_server(port): - server = SimpleXMLRPCServer(("localhost", port)) - server.register_function(run_python_code) - server.register_function(run_bash_code) - server.serve_forever() +################################################################################ def main(): run_as_nobody() if len(sys.argv) == 1: @@ -171,9 +243,8 @@ def main(): else: ports = [int(x) for x in sys.argv[1:]] - for port in ports: - p = Process(target=run_server, args=(port,)) - p.start() - + server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) + server_pool.run() + if __name__ == '__main__': main() |