diff options
-rw-r--r-- | yaksh/code_server.py | 246 | ||||
-rw-r--r-- | yaksh/settings.py | 6 | ||||
-rw-r--r-- | yaksh/tests/test_code_server.py | 131 |
3 files changed, 206 insertions, 177 deletions
diff --git a/yaksh/code_server.py b/yaksh/code_server.py index 834c5af..6acce74 100644 --- a/yaksh/code_server.py +++ b/yaksh/code_server.py @@ -1,59 +1,34 @@ #!/usr/bin/env python -"""This server runs an HTTP server (using tornado) and several code servers -using XMLRPC that can be submitted code -and tests and returns the output. It *should* be run as root and will run as -the user 'nobody' so as to minimize any damange by errant code. This can be -configured by editing settings.py to run as many servers as desired. One can -also specify the ports on the command line. Here are examples:: +"""This server runs an HTTP server (using tornado) to which code can be +submitted for checking. This is asynchronous so once submitted the user can +check for the result. - $ sudo ./code_server.py - # Runs servers based on settings.py:SERVER_PORTS one server per port given. - -or:: - - $ sudo ./code_server.py 8001 8002 8003 8004 8005 - # Runs 5 servers on ports specified. - -All these servers should be running as nobody. This will also start a server -pool that defaults to port 50000 and is configurable in -settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function -that returns an available server. +It *should* be run as root and will run as the user 'nobody' so as to minimize +any damange by errant code. This can be configured by editing settings.py to +run as many servers as desired. """ # Standard library imports from __future__ import unicode_literals +from argparse import ArgumentParser import json -from multiprocessing import Process, Queue +from multiprocessing import Process, Queue, Manager import os -from os.path import isdir, dirname, abspath, join, isfile +from os.path import dirname, abspath import pwd -import re -import signal -import stat -import subprocess import sys - -try: - from SimpleXMLRPCServer import SimpleXMLRPCServer -except ImportError: - # The above import will not work on Python-3.x. - from xmlrpc.server import SimpleXMLRPCServer - -try: - from urllib import unquote -except ImportError: - # The above import will not work on Python-3.x. - from urllib.parse import unquote +import time # Library imports +import requests from tornado.ioloop import IOLoop from tornado.web import Application, RequestHandler +from six.moves import urllib # Local imports -from .settings import SERVER_PORTS, SERVER_POOL_PORT -from .language_registry import create_evaluator_instance +from .settings import N_CODE_SERVERS, SERVER_POOL_PORT from .grader import Grader @@ -69,70 +44,45 @@ def run_as_nobody(): os.seteuid(nobody.pw_uid) -############################################################################### -# `CodeServer` class. -############################################################################### -class CodeServer(object): - """A code server that executes user submitted test code, tests it and - reports if the code was correct or not. +def check_code(job_queue, results): + """Check the code, this runs forever. """ - def __init__(self, port, queue): - self.port = port - self.queue = queue - - # Public Protocol ########## - def check_code(self, language, json_data, in_dir=None): - """Calls relevant EvaluateCode class based on language to check the - answer code - """ + while True: + uid, json_data, user_dir = job_queue.get(True) + results[uid] = dict(status='running', result=None) data = json.loads(json_data) - grader = Grader(in_dir) - result = grader.evaluate(data) - - # Put us back into the server pool queue since we are free now. - self.queue.put(self.port) - - return json.dumps(result) - - def run(self): - """Run XMLRPC server, serving our methods.""" - server = SimpleXMLRPCServer(("0.0.0.0", self.port)) - self.server = server - server.register_instance(self) - self.queue.put(self.port) - server.serve_forever() + grader = Grader(user_dir) + result = grader.evaluate(data) + results[uid] = dict(status='done', result=json.dumps(result)) ############################################################################### # `ServerPool` class. ############################################################################### class ServerPool(object): - """Manages a pool of CodeServer objects.""" - def __init__(self, ports, pool_port=50000): - """Create a pool of servers. Uses a shared Queue to get available - servers. + """Manages a pool of processes checking code.""" + def __init__(self, n, pool_port=50000): + """Create a pool of servers. Parameters ---------- - ports : list(int) - List of ports at which the CodeServer's should run. + n : int + Number of code servers to run pool_port : int Port at which the server pool should serve. """ + self.n = n + self.manager = Manager() + self.results = self.manager.dict() self.my_port = pool_port - self.ports = ports - queue = Queue(maxsize=len(self.ports)) - self.queue = queue - servers = [] + + self.job_queue = Queue() processes = [] - for port in self.ports: - server = CodeServer(port, queue) - servers.append(server) - p = Process(target=server.run) + for i in range(n): + p = Process(target=check_code, args=(self.job_queue, self.results)) processes.append(p) - self.servers = servers self.processes = processes self.app = self._make_app() @@ -150,21 +100,26 @@ class ServerPool(object): # Public Protocol ########## - def get_server_port(self): - """Get available server port from ones in the pool. This will block - till it gets an available server. - """ - return self.queue.get() - def get_status(self): - """Returns current queue size and total number of ports used.""" - try: - qs = self.queue.qsize() - except NotImplementedError: - # May not work on OS X so we return a dummy. - qs = len(self.ports) - - return qs, len(self.ports) + """Returns current job queue size, total number of processes alive. + """ + qs = sum(r['status'] == 'not started' + for r in self.results.values()) + alive = sum(p.is_alive() for p in self.processes) + n_running = sum(r['status'] == 'running' + for r in self.results.values()) + + return qs, alive, n_running + + def submit(self, uid, json_data, user_dir): + self.results[uid] = dict(status='not started') + self.job_queue.put((uid, json_data, user_dir)) + + def get_result(self, uid): + result = self.results.get(uid, dict(status='unknown')) + if result.get('status') == 'done': + self.results.pop(uid) + return json.dumps(result) def run(self): """Run server which returns an available server port where code @@ -189,24 +144,95 @@ class MainHandler(RequestHandler): def get(self): path = self.request.path[1:] if len(path) == 0: - port = self.server.get_server_port() - self.write(str(port)) - elif path == "status": - q_size, total = self.server.get_status() - result = "%d servers out of %d are free.\n"%(q_size, total) - load = float(total - q_size)/total*100 - result += "Load: %s%%\n"%load + q_size, alive, running = self.server.get_status() + result = "%d processes, %d running, %d queued" % ( + alive, running, q_size + ) self.write(result) + else: + uid = path + json_result = self.server.get_result(uid) + self.write(json_result) + + def post(self): + uid = self.get_argument('uid') + json_data = self.get_argument('json_data') + user_dir = self.get_argument('user_dir') + self.server.submit(uid, json_data, user_dir) + self.write('OK') + + +def submit(url, uid, json_data, user_dir): + '''Submit a job to the code server. + + Parameters + ---------- + + url : str + URL of the server pool. + + uid : str + Unique ID of the submission. + + json_data : jsonized str + Data to send to the code checker. + + user_dir : str + User directory. + ''' + requests.post( + url, data=dict(uid=uid, json_data=json_data, user_dir=user_dir) + ) + + +def get_result(url, uid, block=False): + '''Get the status of a job submitted to the code server. + + Returns the result currently known in the form of a dict. The dictionary + contains two keys, 'status' and 'result'. The status can be one of + ['running', 'not started', 'done', 'unknown']. The result is the result of + the code execution as a jsonized string. + + Parameters + ---------- + + url : str + URL of the server pool. + + uid : str + Unique ID of the submission. + + block : bool + Set to True if you wish to block till result is done. + + ''' + def _get_data(): + r = requests.get(urllib.parse.urljoin(url, str(uid))) + return json.loads(r.content) + data = _get_data() + if block: + while data.get('status') != 'done': + time.sleep(0.1) + data = _get_data() + + return data ############################################################################### def main(args=None): - if args: - ports = [int(x) for x in args] - else: - ports = SERVER_PORTS - - server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) + parser = ArgumentParser(description=__doc__) + parser.add_argument( + 'n', nargs='?', type=int, default=N_CODE_SERVERS, + help="Number of servers to run." + ) + parser.add_argument( + '-p', '--port', dest='port', default=SERVER_POOL_PORT, + help="Port at which the http server should run." + ) + + options = parser.parse_args(args) + + server_pool = ServerPool(n=options.n, pool_port=options.port) # This is done *after* the server pool is created because when the tornado # app calls listen(), it cannot be nobody. run_as_nobody() diff --git a/yaksh/settings.py b/yaksh/settings.py index 72f9fda..d500d93 100644 --- a/yaksh/settings.py +++ b/yaksh/settings.py @@ -1,9 +1,9 @@ """ settings for yaksh app. """ -# The ports the code server should run on. This will run one separate -# server for each port listed in the following list. -SERVER_PORTS = [8001] # range(8001, 8026) + +# The number of code server processes to run.. +N_CODE_SERVERS = 5 # The server pool port. This is the server which returns available server # ports so as to minimize load. This is some random number where no other diff --git a/yaksh/tests/test_code_server.py b/yaksh/tests/test_code_server.py index 47c1da7..a73c12f 100644 --- a/yaksh/tests/test_code_server.py +++ b/yaksh/tests/test_code_server.py @@ -8,9 +8,8 @@ from threading import Thread import unittest from six.moves import urllib -from yaksh.code_server import ServerPool, SERVER_POOL_PORT +from yaksh.code_server import ServerPool, SERVER_POOL_PORT, submit, get_result from yaksh import settings -from yaksh.xmlrpc_clients import CodeServerProxy class TestCodeServer(unittest.TestCase): @@ -19,8 +18,7 @@ class TestCodeServer(unittest.TestCase): def setUpClass(cls): settings.code_evaluators['python']['standardtestcase'] = \ "yaksh.python_assertion_evaluator.PythonAssertionEvaluator" - ports = range(8001, 8006) - server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) + server_pool = ServerPool(n=5, pool_port=SERVER_POOL_PORT) cls.server_pool = server_pool cls.server_thread = t = Thread(target=server_pool.run) t.start() @@ -33,70 +31,78 @@ class TestCodeServer(unittest.TestCase): "python_assertion_evaluator.PythonAssertionEvaluator" def setUp(self): - self.code_server = CodeServerProxy() + self.url = 'http://localhost:%s' % SERVER_POOL_PORT def test_infinite_loop(self): # Given - testdata = {'metadata': {'user_answer': 'while True: pass', - 'language': 'python', - 'partial_grading': False - }, - 'test_case_data': [{'test_case':'assert 1==2', - 'test_case_type': 'standardtestcase', - 'weight': 0.0 - }] - } + testdata = { + 'metadata': { + 'user_answer': 'while True: pass', + 'language': 'python', + 'partial_grading': False + }, + 'test_case_data': [ + {'test_case': 'assert 1==2', + 'test_case_type': 'standardtestcase', + 'weight': 0.0} + ] + } # When - result = self.code_server.run_code( - 'python', json.dumps(testdata), '' - ) + submit(self.url, '0', json.dumps(testdata), '') + result = get_result(self.url, '0') # Then - data = json.loads(result) + self.assertTrue(result.get('status') in ['running', 'not started']) + + # When + result = get_result(self.url, '0', block=True) + + # Then + data = json.loads(result.get('result')) self.assertFalse(data['success']) self.assertTrue('infinite loop' in data['error'][0]) def test_correct_answer(self): # Given - testdata = {'metadata': { 'user_answer': 'def f(): return 1', - 'language': 'python', - 'partial_grading': False - }, - 'test_case_data': [{'test_case':'assert f() == 1', - 'test_case_type': 'standardtestcase', - 'weight': 0.0 - }] - } + testdata = { + 'metadata': { + 'user_answer': 'def f(): return 1', + 'language': 'python', + 'partial_grading': False + }, + 'test_case_data': [{'test_case': 'assert f() == 1', + 'test_case_type': 'standardtestcase', + 'weight': 0.0}] + } # When - result = self.code_server.run_code( - 'python', json.dumps(testdata), '' - ) + submit(self.url, '0', json.dumps(testdata), '') + result = get_result(self.url, '0', block=True) # Then - data = json.loads(result) + data = json.loads(result.get('result')) self.assertTrue(data['success']) def test_wrong_answer(self): # Given - testdata = {'metadata': { 'user_answer': 'def f(): return 1', - 'language': 'python', - 'partial_grading': False - }, - 'test_case_data': [{'test_case':'assert f() == 2', - 'test_case_type': 'standardtestcase', - 'weight': 0.0 - }] - } + testdata = { + 'metadata': { + 'user_answer': 'def f(): return 1', + 'language': 'python', + 'partial_grading': False + }, + 'test_case_data': [{'test_case': 'assert f() == 2', + 'test_case_type': 'standardtestcase', + 'weight': 0.0}] + } # When - result = self.code_server.run_code( - 'python', json.dumps(testdata), '' - ) + submit(self.url, '0', json.dumps(testdata), '') + result = get_result(self.url, '0', block=True) # Then - data = json.loads(result) + data = json.loads(result.get('result')) self.assertFalse(data['success']) self.assertTrue('AssertionError' in data['error'][0]) @@ -104,28 +110,27 @@ class TestCodeServer(unittest.TestCase): # Given results = Queue() - def run_code(): + def run_code(uid): """Run an infinite loop.""" - testdata = {'metadata': { 'user_answer': 'while True: pass', - 'language': 'python', - 'partial_grading': False - }, - 'test_case_data': [{'test_case':'assert 1==2', - 'test_case_type': 'standardtestcase', - 'weight': 0.0 - }] - } - result = self.code_server.run_code( - 'python', json.dumps(testdata), '' - ) - results.put(json.loads(result)) + testdata = { + 'metadata': { + 'user_answer': 'while True: pass', + 'language': 'python', + 'partial_grading': False + }, + 'test_case_data': [{'test_case': 'assert 1==2', + 'test_case_type': 'standardtestcase', + 'weight': 0.0}] + } + submit(self.url, uid, json.dumps(testdata), '') + result = get_result(self.url, uid, block=True) + results.put(json.loads(result.get('result'))) N = 10 # When - import time threads = [] for i in range(N): - t = Thread(target=run_code) + t = Thread(target=run_code, args=(str(i),)) threads.append(t) t.start() @@ -142,16 +147,14 @@ class TestCodeServer(unittest.TestCase): def test_server_pool_status(self): # Given - url = "http://localhost:%s/status"%SERVER_POOL_PORT + url = "http://localhost:%s/" % SERVER_POOL_PORT # When response = urllib.request.urlopen(url) data = response.read().decode('utf-8') # Then - expect = 'out of 5 are free' - self.assertTrue(expect in data) - expect = 'Load:' + expect = '5 processes, 0 running, 0 queued' self.assertTrue(expect in data) |