summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrabhu Ramachandran2011-11-24 19:09:42 +0530
committerPrabhu Ramachandran2011-11-24 19:09:42 +0530
commit2fd98c77cb06d078ad82f60abfe769cd7666e229 (patch)
tree33ce2cadc7c2e0610cb490df449d3a850c1ece28
parenta1b550e09475da1cdf345429df7708a7ae82d38f (diff)
downloadonline_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.tar.gz
online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.tar.bz2
online_test-2fd98c77cb06d078ad82f60abfe769cd7666e229.zip
ENH: Creating a ServerPool for code checks
Changed the server so we use a pool of servers managed with a Queue of available servers. The XML/RPC client is also changed to handle connection failures gracefully. This is because XML/RPC cannot have more than 2 connections at a given time, so if there is an error, we wait for a random amount of time and try again. This allows us to handle fairly large loads nicely.
-rwxr-xr-xcode_server.py299
-rw-r--r--exam/xmlrpc_clients.py54
-rw-r--r--settings.py7
3 files changed, 225 insertions, 135 deletions
diff --git a/code_server.py b/code_server.py
index b9a5fdf..82a0f49 100755
--- a/code_server.py
+++ b/code_server.py
@@ -13,7 +13,10 @@ or::
$ sudo ./code_server.py 8001 8002 8003 8004 8005
# Runs 5 servers on ports specified.
-All these servers should be running as nobody.
+All these servers should be running as nobody. This will also start a server
+pool that defaults to port 50000 and is configurable in
+settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function
+that returns an available server.
"""
import sys
import traceback
@@ -23,11 +26,11 @@ import os
import stat
from os.path import isdir, dirname, abspath, join
import signal
-from multiprocessing import Process
+from multiprocessing import Process, Queue
import subprocess
# Local imports.
-from settings import SERVER_PORTS, SERVER_TIMEOUT
+from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT
MY_DIR = abspath(dirname(__file__))
@@ -38,8 +41,6 @@ def run_as_nobody():
os.setegid(nobody.pw_gid)
os.seteuid(nobody.pw_uid)
-################################################################################
-# Python related code.
# Raised when the code times-out.
# c.f. http://pguides.net/python/timeout-a-function
@@ -50,120 +51,191 @@ def timeout_handler(signum, frame):
"""A handler for the ALARM signal."""
raise TimeoutException('Code took too long to run.')
-def run_python_code(answer, test_code, in_dir=None):
- """Tests given Python function (`answer`) with the `test_code` supplied.
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original when
- done). This function also timesout when the function takes more than
- SERVER_TIMEOUT seconds to run to prevent runaway code.
- Returns
- -------
-
- A tuple: (success, error message).
-
+################################################################################
+# `CodeServer` class.
+################################################################################
+class CodeServer(object):
+ """A code server that executes user submitted test code, tests it and
+ reports if the code was correct or not.
"""
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
+ def __init__(self, port, queue):
+ self.port = port
+ self.queue = queue
+
+ def run_python_code(self, answer, test_code, in_dir=None):
+ """Tests given Python function (`answer`) with the `test_code` supplied.
+ If the optional `in_dir` keyword argument is supplied it changes the
+ directory to that directory (it does not change it back to the original when
+ done). This function also timesout when the function takes more than
+ SERVER_TIMEOUT seconds to run to prevent runaway code.
+
+ Returns
+ -------
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- success = False
- tb = None
- try:
- submitted = compile(answer, '<string>', mode='exec')
- g = {}
- exec submitted in g
- _tests = compile(test_code, '<string>', mode='exec')
- exec _tests in g
- except TimeoutException:
- err = 'Code took more than %s seconds to run.'%SERVER_TIMEOUT
- except AssertionError:
- type, value, tb = sys.exc_info()
- info = traceback.extract_tb(tb)
- fname, lineno, func, text = info[-1]
- text = str(test_code).splitlines()[lineno-1]
- err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- else:
- success = True
- err = 'Correct answer'
- finally:
- del tb
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
+ A tuple: (success, error message).
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- return success, err
-
-################################################################################
-# Run code for Bash.
-def run_bash_code(answer, test_code, in_dir=None):
-
- """Tests given Bash code (`answer`) with the `test_code` supplied. It
- assumes that there are two parts to the test_code separated by '#++++++'.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original when
- done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- def _set_exec(fname):
- os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR
- |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP
- |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH)
-
- # XXX: fix this to not hardcode it to 6 +'s!
- reference, args = test_code.split('#++++++')
- ref_f = open('reference.sh', 'w')
- ref_f.write(reference); ref_f.close()
- ref_fname = abspath(ref_f.name)
- _set_exec(ref_fname)
- args_f = open('reference.args', 'w')
- args_f.write(args); args_f.close()
- _set_exec(args_f.name)
- submit_f = open('submit.sh', 'w')
- submit_f.write(answer); submit_f.close()
- submit_fname = abspath(submit_f.name)
- _set_exec(submit_fname)
-
- tester = join(MY_DIR, 'shell_script_tester.sh')
-
- # Run the shell code in a subprocess.
- try:
- output = subprocess.check_output([tester, ref_fname, submit_fname],
- stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError, exc:
+ """
+ if in_dir is not None and isdir(in_dir):
+ os.chdir(in_dir)
+
+ # Add a new signal handler for the execution of this code.
+ old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+ signal.alarm(SERVER_TIMEOUT)
+
success = False
- err = 'Error: exist status: %d, message: %s'%(exc.returncode,
- exc.output)
- else:
- success = True
- err = 'Correct answer'
+ tb = None
+ try:
+ submitted = compile(answer, '<string>', mode='exec')
+ g = {}
+ exec submitted in g
+ _tests = compile(test_code, '<string>', mode='exec')
+ exec _tests in g
+ except TimeoutException:
+ err = 'Code took more than %s seconds to run.'%SERVER_TIMEOUT
+ except AssertionError:
+ type, value, tb = sys.exc_info()
+ info = traceback.extract_tb(tb)
+ fname, lineno, func, text = info[-1]
+ text = str(test_code).splitlines()[lineno-1]
+ err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ else:
+ success = True
+ err = 'Correct answer'
+ finally:
+ del tb
+ # Set back any original signal handler.
+ signal.signal(signal.SIGALRM, old_handler)
+
+ # Cancel the signal if any, see signal.alarm documentation.
+ signal.alarm(0)
+
+ # Put us back into the server pool queue since we are free now.
+ self.queue.put(self.port)
+
+ return success, err
+
+ def run_bash_code(self, answer, test_code, in_dir=None):
+
+ """Tests given Bash code (`answer`) with the `test_code` supplied. It
+ assumes that there are two parts to the test_code separated by '#++++++'.
+
+ If the optional `in_dir` keyword argument is supplied it changes the
+ directory to that directory (it does not change it back to the original when
+ done).
+
+ Returns
+ -------
+
+ A tuple: (success, error message).
+
+ """
+ if in_dir is not None and isdir(in_dir):
+ os.chdir(in_dir)
+
+ def _set_exec(fname):
+ os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR
+ |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP
+ |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH)
+
+ # XXX: fix this to not hardcode it to 6 +'s!
+ reference, args = test_code.split('#++++++')
+ ref_f = open('reference.sh', 'w')
+ ref_f.write(reference); ref_f.close()
+ ref_fname = abspath(ref_f.name)
+ _set_exec(ref_fname)
+ args_f = open('reference.args', 'w')
+ args_f.write(args); args_f.close()
+ _set_exec(args_f.name)
+ submit_f = open('submit.sh', 'w')
+ submit_f.write(answer); submit_f.close()
+ submit_fname = abspath(submit_f.name)
+ _set_exec(submit_fname)
+
+ tester = join(MY_DIR, 'shell_script_tester.sh')
+
+ # Run the shell code in a subprocess.
+ try:
+ output = subprocess.check_output([tester, ref_fname, submit_fname],
+ stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError, exc:
+ success = False
+ err = 'Error: exist status: %d, message: %s'%(exc.returncode,
+ exc.output)
+ else:
+ success = True
+ err = 'Correct answer'
+
+ # Put us back into the server pool queue since we are free now.
+ self.queue.put(self.port)
+ return success, err
+
+ def run(self):
+ server = SimpleXMLRPCServer(("localhost", self.port))
+ self.server = server
+ server.register_instance(self)
+ self.queue.put(self.port)
+ server.serve_forever()
- return success, err
+################################################################################
+# `ServerPool` class.
+################################################################################
+class ServerPool(object):
+ """Manages a pool of CodeServer objects."""
+ def __init__(self, ports, pool_port=50000):
+ """Create a pool of servers. Uses a shared Queue to get available
+ servers.
+
+ Parameters
+ ----------
+
+ ports : list(int)
+ List of ports at which the CodeServer's should run.
+
+ pool_port : int
+ Port at which the server pool should serve.
+ """
+ self.my_port = pool_port
+ self.ports = ports
+ queue = Queue(maxsize=len(ports))
+ self.queue = queue
+ servers = []
+ for port in ports:
+ server = CodeServer(port, queue)
+ servers.append(server)
+ p = Process(target=server.run)
+ p.start()
+ self.servers = servers
+
+ def get_server_port(self):
+ """Get available server port from ones in the pool. This will block
+ till it gets an available server.
+ """
+ q = self.queue
+ was_waiting = True if q.empty() else False
+ port = q.get()
+ if was_waiting:
+ print '*'*80
+ print "No available servers, was waiting but got server later at %d."%port
+ print '*'*80
+ sys.stdout.flush()
+ return port
+
+ def run(self):
+ """Run server which returns an available server port where code
+ can be executed.
+ """
+ server = SimpleXMLRPCServer(("localhost", self.my_port))
+ self.server = server
+ server.register_instance(self)
+ server.serve_forever()
-def run_server(port):
- server = SimpleXMLRPCServer(("localhost", port))
- server.register_function(run_python_code)
- server.register_function(run_bash_code)
- server.serve_forever()
+################################################################################
def main():
run_as_nobody()
if len(sys.argv) == 1:
@@ -171,9 +243,8 @@ def main():
else:
ports = [int(x) for x in sys.argv[1:]]
- for port in ports:
- p = Process(target=run_server, args=(port,))
- p.start()
-
+ server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT)
+ server_pool.run()
+
if __name__ == '__main__':
main()
diff --git a/exam/xmlrpc_clients.py b/exam/xmlrpc_clients.py
index 01172d7..817e37d 100644
--- a/exam/xmlrpc_clients.py
+++ b/exam/xmlrpc_clients.py
@@ -1,17 +1,24 @@
from xmlrpclib import ServerProxy
-from settings import SERVER_PORTS
+import time
import random
import socket
+from settings import SERVER_PORTS, SERVER_POOL_PORT
-class CodeServer(object):
+
+class ConnectionError(Exception):
+ pass
+
+################################################################################
+# `CodeServerProxy` class.
+################################################################################
+class CodeServerProxy(object):
"""A class that manages accesing the farm of Python servers and making
calls to them such that no one XMLRPC server is overloaded.
"""
def __init__(self):
- servers = [ServerProxy('http://localhost:%d'%(x)) for x in SERVER_PORTS]
- self.servers = servers
- self.indices = range(len(SERVER_PORTS))
+ pool_url = 'http://localhost:%d'%(SERVER_POOL_PORT)
+ self.pool_server = ServerProxy(pool_url)
self.methods = {"python": 'run_python_code',
"bash": 'run_bash_code'}
@@ -38,27 +45,34 @@ class CodeServer(object):
A tuple: (success, error message).
"""
method_name = self.methods[language]
- done = False
- result = [False, 'Unable to connect to any code servers!']
- # Try to connect a few times if not, quit.
- count = 5
- while (not done) and (count > 0):
+ try:
+ server = self._get_server()
+ method = getattr(server, method_name)
+ result = method(answer, test_code, user_dir)
+ except ConnectionError:
+ result = [False, 'Unable to connect to any code servers!']
+ return result
+
+ def _get_server(self):
+ # Get a suitable server from our pool of servers. This may block. We
+ # try about 60 times, essentially waiting at most for about 30 seconds.
+ done, count = False, 60
+
+ while not done and count > 0:
try:
- server = self._get_server()
- method = getattr(server, method_name)
- result = method(answer, test_code, user_dir)
+ port = self.pool_server.get_server_port()
except socket.error:
+ # Wait a while try again.
+ time.sleep(random.random())
count -= 1
else:
done = True
- return result
-
- def _get_server(self):
- # pick a suitable server at random from our pool of servers.
- index = random.choice(self.indices)
- return self.servers[index]
+ if not done:
+ raise ConnectionError("Couldn't connect to a server!")
+ proxy = ServerProxy('http://localhost:%d'%port)
+ return proxy
# views.py calls this Python server which forwards the request to one
# of the running servers.
-code_server = CodeServer()
+code_server = CodeServerProxy()
diff --git a/settings.py b/settings.py
index ebfec30..c007867 100644
--- a/settings.py
+++ b/settings.py
@@ -5,10 +5,15 @@ from os.path import dirname, join, basename, abspath
DEBUG = True
TEMPLATE_DEBUG = DEBUG
-# The ports the Python server should run on. This will run one separate
+# The ports the code server should run on. This will run one separate
# server for each port listed in the following list.
SERVER_PORTS = [8001] # range(8001, 8026)
+# The server pool port. This is the server which returns available server
+# ports so as to minimize load. This is some random number where no other
+# service is running. It should be > 1024 and less < 65535 though.
+SERVER_POOL_PORT = 53579
+
# Timeout for the code to run in seconds. This is an integer!
SERVER_TIMEOUT = 2