diff options
author | Prabhu Ramachandran | 2016-08-12 21:01:08 +0530 |
---|---|---|
committer | Prabhu Ramachandran | 2016-08-12 21:01:08 +0530 |
commit | b39eb538c08d4dd6761d6a970aadf58bde58a7ce (patch) | |
tree | 6732cb4c0f89ffc80b7b9aa60d54e2d3ff5c70f0 /yaksh | |
parent | 3738c8fefa8ac69508bb6daeee045c1f5ea0cb17 (diff) | |
download | online_test-b39eb538c08d4dd6761d6a970aadf58bde58a7ce.tar.gz online_test-b39eb538c08d4dd6761d6a970aadf58bde58a7ce.tar.bz2 online_test-b39eb538c08d4dd6761d6a970aadf58bde58a7ce.zip |
Use a tornado based server for the pool server.
With the previous XMLRPC based server, an XMLRPC server would respond to
a request for an available port. This does not work as the server can
only take about 2 simultaneous connections. The server pool now uses a
HTTP server via tornado which works extremely well. The django code
should not change at all as this is an internal change. This change
should make the code server far more robust and work for a very large
number of simultaneous users.
The http server also has a simple status page to indicate the current
load. This will not be correct on OSX due to limitations of the
multi-processing Queue implementation on OSX.
Diffstat (limited to 'yaksh')
-rwxr-xr-x | yaksh/code_server.py | 114 | ||||
-rw-r--r-- | yaksh/tests/test_code_server.py | 39 | ||||
-rw-r--r-- | yaksh/xmlrpc_clients.py | 21 |
3 files changed, 112 insertions, 62 deletions
diff --git a/yaksh/code_server.py b/yaksh/code_server.py index a2cd08a..e19e9c8 100755 --- a/yaksh/code_server.py +++ b/yaksh/code_server.py @@ -1,9 +1,11 @@ #!/usr/bin/env python -"""This server runs an XMLRPC server that can be submitted code and tests -and returns the output. It *should* be run as root and will run as the user -'nobody' so as to minimize any damange by errant code. This can be configured -by editing settings.py to run as many servers as desired. One can also -specify the ports on the command line. Here are examples:: + +"""This server runs an HTTP server (using tornado) and several code servers +using XMLRPC that can be submitted code +and tests and returns the output. It *should* be run as root and will run as +the user 'nobody' so as to minimize any damange by errant code. This can be +configured by editing settings.py to run as many servers as desired. One can +also specify the ports on the command line. Here are examples:: $ sudo ./code_server.py # Runs servers based on settings.py:SERVER_PORTS one server per port given. @@ -17,18 +19,32 @@ All these servers should be running as nobody. This will also start a server pool that defaults to port 50000 and is configurable in settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function that returns an available server. + """ -import sys + +# Standard library imports from SimpleXMLRPCServer import SimpleXMLRPCServer -import pwd +import json +from multiprocessing import Process, Queue import os -import stat from os.path import isdir, dirname, abspath, join, isfile +import pwd +import re import signal -from multiprocessing import Process, Queue +import stat import subprocess -import re -import json +import sys + +try: + from urllib import unquote +except ImportError: + # The above import will not work on Python-3.x. + from urllib.parse import unquote + +# Library imports +from tornado.ioloop import IOLoop +from tornado.web import Application, RequestHandler + # Local imports. from settings import SERVER_PORTS, SERVER_POOL_PORT from language_registry import create_evaluator_instance, unpack_json @@ -104,17 +120,30 @@ class ServerPool(object): """ self.my_port = pool_port self.ports = ports - queue = Queue(maxsize=len(ports)) + queue = Queue(maxsize=len(self.ports)) self.queue = queue servers = [] - self.processes = [] - for port in ports: + processes = [] + for port in self.ports: server = CodeServer(port, queue) servers.append(server) p = Process(target=server.run) - p.start() - self.processes.append(p) + processes.append(p) self.servers = servers + self.processes = processes + self.app = self._make_app() + + def _make_app(self): + app = Application([ + (r"/.*", MainHandler, dict(server=self)), + ]) + app.listen(self.my_port) + return app + + def _start_code_servers(self): + for proc in self.processes: + if proc.pid is None: + proc.start() # Public Protocol ########## @@ -122,42 +151,63 @@ class ServerPool(object): """Get available server port from ones in the pool. This will block till it gets an available server. """ - q = self.queue - was_waiting = True if q.empty() else False - port = q.get() - if was_waiting: - print '*'*80 - print "No available servers, was waiting but got server \ - later at %d." % port - print '*'*80 - sys.stdout.flush() - return port + return self.queue.get() + + def get_status(self): + """Returns current queue size and total number of ports used.""" + try: + qs = self.queue.qsize() + except NotImplementedError: + # May not work on OS X so we return a dummy. + qs = len(self.ports) + + return qs, len(self.ports) def run(self): """Run server which returns an available server port where code can be executed. """ - server = SimpleXMLRPCServer(("0.0.0.0", self.my_port)) - self.server = server - server.register_instance(self) - server.serve_forever() + # We start the code servers here to ensure they are run as nobody. + self._start_code_servers() + IOLoop.current().start() def stop(self): """Stop all the code server processes. """ for proc in self.processes: proc.terminate() + IOLoop.current().stop() + + +class MainHandler(RequestHandler): + def initialize(self, server): + self.server = server + + def get(self): + path = self.request.path[1:] + if len(path) == 0: + port = self.server.get_server_port() + self.write(str(port)) + elif path == "status": + q_size, total = self.server.get_status() + result = "%d servers out of %d are free.\n"%(q_size, total) + load = float(total - q_size)/total*100 + result += "Load: %s%%\n"%load + self.write(result) ############################################################################### def main(args=None): - run_as_nobody() if args: - ports = [int(x) for x in args[1:]] + ports = [int(x) for x in args] else: ports = SERVER_PORTS server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) + # This is done *after* the server pool is created because when the tornado + # app calls listen(), it cannot be nobody. + run_as_nobody() + server_pool.run() if __name__ == '__main__': diff --git a/yaksh/tests/test_code_server.py b/yaksh/tests/test_code_server.py index 18510c6..a73f073 100644 --- a/yaksh/tests/test_code_server.py +++ b/yaksh/tests/test_code_server.py @@ -1,17 +1,15 @@ import json -from multiprocessing import Process try: from Queue import Queue except ImportError: from queue import Queue from threading import Thread import unittest - +import urllib from yaksh.code_server import ServerPool, SERVER_POOL_PORT - from yaksh import settings -from yaksh.xmlrpc_clients import code_server +from yaksh.xmlrpc_clients import CodeServerProxy class TestCodeServer(unittest.TestCase): @@ -23,24 +21,26 @@ class TestCodeServer(unittest.TestCase): ports = range(8001, 8006) server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) cls.server_pool = server_pool - cls.server_proc = p = Process(target=server_pool.run) - p.start() - + cls.server_thread = t = Thread(target=server_pool.run) + t.start() @classmethod def tearDownClass(cls): cls.server_pool.stop() - cls.server_proc.terminate() + cls.server_thread.join() settings.code_evaluators['python']['standardtestcase'] = \ "python_assertion_evaluator.PythonAssertionEvaluator" + def setUp(self): + self.code_server = CodeServerProxy() + def test_inifinite_loop(self): # Given testdata = {'user_answer': 'while True: pass', 'test_case_data': [{'test_case':'assert 1==2'}]} # When - result = code_server.run_code( + result = self.code_server.run_code( 'python', 'standardtestcase', json.dumps(testdata), '' ) @@ -55,7 +55,7 @@ class TestCodeServer(unittest.TestCase): 'test_case_data': [{'test_case':'assert f() == 1'}]} # When - result = code_server.run_code( + result = self.code_server.run_code( 'python', 'standardtestcase', json.dumps(testdata), '' ) @@ -70,7 +70,7 @@ class TestCodeServer(unittest.TestCase): 'test_case_data': [{'test_case':'assert f() == 2'}]} # When - result = code_server.run_code( + result = self.code_server.run_code( 'python', 'standardtestcase', json.dumps(testdata), '' ) @@ -87,12 +87,12 @@ class TestCodeServer(unittest.TestCase): """Run an infinite loop.""" testdata = {'user_answer': 'while True: pass', 'test_case_data': [{'test_case':'assert 1==2'}]} - result = code_server.run_code( + result = self.code_server.run_code( 'python', 'standardtestcase', json.dumps(testdata), '' ) results.put(json.loads(result)) - N = 5 + N = 10 # When import time threads = [] @@ -112,6 +112,19 @@ class TestCodeServer(unittest.TestCase): self.assertFalse(data['success']) self.assertTrue('infinite loop' in data['error']) + def test_server_pool_status(self): + # Given + url = "http://localhost:%s/status"%SERVER_POOL_PORT + + # When + data = urllib.urlopen(url).read() + + # Then + expect = 'out of 5 are free' + self.assertTrue(expect in data) + expect = 'Load:' + self.assertTrue(expect in data) + if __name__ == '__main__': unittest.main() diff --git a/yaksh/xmlrpc_clients.py b/yaksh/xmlrpc_clients.py index 7124550..6bfe0d6 100644 --- a/yaksh/xmlrpc_clients.py +++ b/yaksh/xmlrpc_clients.py @@ -3,6 +3,7 @@ import time import random import socket import json +import urllib from settings import SERVER_PORTS, SERVER_POOL_PORT @@ -21,7 +22,7 @@ class CodeServerProxy(object): """ def __init__(self): pool_url = 'http://localhost:%d' % (SERVER_POOL_PORT) - self.pool_server = ServerProxy(pool_url) + self.pool_url = pool_url def run_code(self, language, test_case_type, json_data, user_dir): """Tests given code (`answer`) with the `test_code` supplied. If the @@ -34,7 +35,7 @@ class CodeServerProxy(object): ---------- json_data contains; user_answer : str - The user's answer for the question. + The user's answer for the question. test_code : str The test code to check the user code with. language : str @@ -57,21 +58,7 @@ class CodeServerProxy(object): return result def _get_server(self): - # Get a suitable server from our pool of servers. This may block. We - # try about 60 times, essentially waiting at most for about 30 seconds. - done, count = False, 60 - - while not done and count > 0: - try: - port = self.pool_server.get_server_port() - except socket.error: - # Wait a while try again. - time.sleep(random.random()) - count -= 1 - else: - done = True - if not done: - raise ConnectionError("Couldn't connect to a server!") + port = json.loads(urllib.urlopen(self.pool_url).read()) proxy = ServerProxy('http://localhost:%d' % port) return proxy |