diff options
author | Prabhu Ramachandran | 2017-09-15 16:31:50 +0530 |
---|---|---|
committer | GitHub | 2017-09-15 16:31:50 +0530 |
commit | 7419e3b3f4e14f86f21f9464843f9263638fe7a2 (patch) | |
tree | a046617f76879ea5c056ae2c8f39b0a97c8a18a7 /yaksh/code_server.py | |
parent | e3a43662d2aae8688039671d3de532e48fbdfda9 (diff) | |
parent | f65102cf4b6a117a3ff86971ad9c1ddd3362c9fd (diff) | |
download | online_test-7419e3b3f4e14f86f21f9464843f9263638fe7a2.tar.gz online_test-7419e3b3f4e14f86f21f9464843f9263638fe7a2.tar.bz2 online_test-7419e3b3f4e14f86f21f9464843f9263638fe7a2.zip |
Merge pull request #326 from FOSSEE/improve-code-server
Improve code server
Diffstat (limited to 'yaksh/code_server.py')
-rwxr-xr-x[-rw-r--r--] | yaksh/code_server.py | 274 |
1 files changed, 162 insertions, 112 deletions
diff --git a/yaksh/code_server.py b/yaksh/code_server.py index 834c5af..75dd9b2 100644..100755 --- a/yaksh/code_server.py +++ b/yaksh/code_server.py @@ -1,59 +1,34 @@ #!/usr/bin/env python -"""This server runs an HTTP server (using tornado) and several code servers -using XMLRPC that can be submitted code -and tests and returns the output. It *should* be run as root and will run as -the user 'nobody' so as to minimize any damange by errant code. This can be -configured by editing settings.py to run as many servers as desired. One can -also specify the ports on the command line. Here are examples:: +"""This server runs an HTTP server (using tornado) to which code can be +submitted for checking. This is asynchronous so once submitted the user can +check for the result. - $ sudo ./code_server.py - # Runs servers based on settings.py:SERVER_PORTS one server per port given. - -or:: - - $ sudo ./code_server.py 8001 8002 8003 8004 8005 - # Runs 5 servers on ports specified. - -All these servers should be running as nobody. This will also start a server -pool that defaults to port 50000 and is configurable in -settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function -that returns an available server. +It *should* be run as root and will run as the user 'nobody' so as to minimize +any damange by errant code. This can be configured by editing settings.py to +run as many servers as desired. """ # Standard library imports from __future__ import unicode_literals +from argparse import ArgumentParser import json -from multiprocessing import Process, Queue +from multiprocessing import Process, Queue, Manager import os -from os.path import isdir, dirname, abspath, join, isfile +from os.path import dirname, abspath import pwd -import re -import signal -import stat -import subprocess import sys - -try: - from SimpleXMLRPCServer import SimpleXMLRPCServer -except ImportError: - # The above import will not work on Python-3.x. - from xmlrpc.server import SimpleXMLRPCServer - -try: - from urllib import unquote -except ImportError: - # The above import will not work on Python-3.x. - from urllib.parse import unquote +import time # Library imports +import requests from tornado.ioloop import IOLoop from tornado.web import Application, RequestHandler +from six.moves import urllib # Local imports -from .settings import SERVER_PORTS, SERVER_POOL_PORT -from .language_registry import create_evaluator_instance +from .settings import N_CODE_SERVERS, SERVER_POOL_PORT from .grader import Grader @@ -69,70 +44,45 @@ def run_as_nobody(): os.seteuid(nobody.pw_uid) -############################################################################### -# `CodeServer` class. -############################################################################### -class CodeServer(object): - """A code server that executes user submitted test code, tests it and - reports if the code was correct or not. +def check_code(pid, job_queue, results): + """Check the code, this runs forever. """ - def __init__(self, port, queue): - self.port = port - self.queue = queue - - # Public Protocol ########## - def check_code(self, language, json_data, in_dir=None): - """Calls relevant EvaluateCode class based on language to check the - answer code - """ + while True: + uid, json_data, user_dir = job_queue.get(True) + results[uid] = dict(status='running', pid=pid, result=None) data = json.loads(json_data) - grader = Grader(in_dir) - result = grader.evaluate(data) - - # Put us back into the server pool queue since we are free now. - self.queue.put(self.port) - - return json.dumps(result) - - def run(self): - """Run XMLRPC server, serving our methods.""" - server = SimpleXMLRPCServer(("0.0.0.0", self.port)) - self.server = server - server.register_instance(self) - self.queue.put(self.port) - server.serve_forever() + grader = Grader(user_dir) + result = grader.evaluate(data) + results[uid] = dict(status='done', result=json.dumps(result)) ############################################################################### # `ServerPool` class. ############################################################################### class ServerPool(object): - """Manages a pool of CodeServer objects.""" - def __init__(self, ports, pool_port=50000): - """Create a pool of servers. Uses a shared Queue to get available - servers. + """Manages a pool of processes checking code.""" + def __init__(self, n, pool_port=50000): + """Create a pool of servers. Parameters ---------- - ports : list(int) - List of ports at which the CodeServer's should run. + n : int + Number of code servers to run pool_port : int Port at which the server pool should serve. """ + self.n = n + self.manager = Manager() + self.results = self.manager.dict() self.my_port = pool_port - self.ports = ports - queue = Queue(maxsize=len(self.ports)) - self.queue = queue - servers = [] + + self.job_queue = Queue() processes = [] - for port in self.ports: - server = CodeServer(port, queue) - servers.append(server) - p = Process(target=server.run) + for i in range(n): + p = self._make_process(i) processes.append(p) - self.servers = servers self.processes = processes self.app = self._make_app() @@ -143,28 +93,56 @@ class ServerPool(object): app.listen(self.my_port) return app + def _make_process(self, pid): + return Process( + target=check_code, args=(pid, self.job_queue, self.results) + ) + def _start_code_servers(self): for proc in self.processes: if proc.pid is None: proc.start() - # Public Protocol ########## + def _handle_dead_process(self, result): + if result.get('status') == 'running': + pid = result.get('pid') + proc = self.processes[pid] + if not proc.is_alive(): + # If the processes is dead, something bad happened so + # restart that process. + new_proc = self._make_process(pid) + self.processes[pid] = new_proc + new_proc.start() + result['status'] = 'done' + result['result'] = json.dumps(dict( + success=False, weight=0.0, + error=['Process ended with exit code %s.' + % proc.exitcode] + )) - def get_server_port(self): - """Get available server port from ones in the pool. This will block - till it gets an available server. - """ - return self.queue.get() + # Public Protocol ########## def get_status(self): - """Returns current queue size and total number of ports used.""" - try: - qs = self.queue.qsize() - except NotImplementedError: - # May not work on OS X so we return a dummy. - qs = len(self.ports) - - return qs, len(self.ports) + """Returns current job queue size, total number of processes alive. + """ + qs = sum(r['status'] == 'not started' + for r in self.results.values()) + alive = sum(p.is_alive() for p in self.processes) + n_running = sum(r['status'] == 'running' + for r in self.results.values()) + + return qs, alive, n_running + + def submit(self, uid, json_data, user_dir): + self.results[uid] = dict(status='not started') + self.job_queue.put((uid, json_data, user_dir)) + + def get_result(self, uid): + result = self.results.get(uid, dict(status='unknown')) + self._handle_dead_process(result) + if result.get('status') == 'done': + self.results.pop(uid) + return json.dumps(result) def run(self): """Run server which returns an available server port where code @@ -189,30 +167,102 @@ class MainHandler(RequestHandler): def get(self): path = self.request.path[1:] if len(path) == 0: - port = self.server.get_server_port() - self.write(str(port)) - elif path == "status": - q_size, total = self.server.get_status() - result = "%d servers out of %d are free.\n"%(q_size, total) - load = float(total - q_size)/total*100 - result += "Load: %s%%\n"%load + q_size, alive, running = self.server.get_status() + result = "%d processes, %d running, %d queued" % ( + alive, running, q_size + ) self.write(result) + else: + uid = path + json_result = self.server.get_result(uid) + self.write(json_result) + + def post(self): + uid = self.get_argument('uid') + json_data = self.get_argument('json_data') + user_dir = self.get_argument('user_dir') + self.server.submit(uid, json_data, user_dir) + self.write('OK') + + +def submit(url, uid, json_data, user_dir): + '''Submit a job to the code server. + + Parameters + ---------- + + url : str + URL of the server pool. + + uid : str + Unique ID of the submission. + + json_data : jsonized str + Data to send to the code checker. + + user_dir : str + User directory. + ''' + requests.post( + url, data=dict(uid=uid, json_data=json_data, user_dir=user_dir) + ) + + +def get_result(url, uid, block=False): + '''Get the status of a job submitted to the code server. + + Returns the result currently known in the form of a dict. The dictionary + contains two keys, 'status' and 'result'. The status can be one of + ['running', 'not started', 'done', 'unknown']. The result is the result of + the code execution as a jsonized string. + + Parameters + ---------- + + url : str + URL of the server pool. + + uid : str + Unique ID of the submission. + + block : bool + Set to True if you wish to block till result is done. + + ''' + def _get_data(): + r = requests.get(urllib.parse.urljoin(url, str(uid))) + return json.loads(r.content.decode('utf-8')) + data = _get_data() + if block: + while data.get('status') != 'done': + time.sleep(0.1) + data = _get_data() + + return data ############################################################################### def main(args=None): - if args: - ports = [int(x) for x in args] - else: - ports = SERVER_PORTS - - server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) - # This is done *after* the server pool is created because when the tornado - # app calls listen(), it cannot be nobody. + parser = ArgumentParser(description=__doc__) + parser.add_argument( + 'n', nargs='?', type=int, default=N_CODE_SERVERS, + help="Number of servers to run." + ) + parser.add_argument( + '-p', '--port', dest='port', default=SERVER_POOL_PORT, + help="Port at which the http server should run." + ) + + options = parser.parse_args(args) + + # Called before serverpool is created so that the multiprocessing + # can work properly. run_as_nobody() + server_pool = ServerPool(n=options.n, pool_port=options.port) server_pool.run() + if __name__ == '__main__': args = sys.argv[1:] main(args) |