summaryrefslogtreecommitdiff
path: root/yaksh/code_server.py
diff options
context:
space:
mode:
authorPrabhu Ramachandran2017-09-15 16:31:50 +0530
committerGitHub2017-09-15 16:31:50 +0530
commit7419e3b3f4e14f86f21f9464843f9263638fe7a2 (patch)
treea046617f76879ea5c056ae2c8f39b0a97c8a18a7 /yaksh/code_server.py
parente3a43662d2aae8688039671d3de532e48fbdfda9 (diff)
parentf65102cf4b6a117a3ff86971ad9c1ddd3362c9fd (diff)
downloadonline_test-7419e3b3f4e14f86f21f9464843f9263638fe7a2.tar.gz
online_test-7419e3b3f4e14f86f21f9464843f9263638fe7a2.tar.bz2
online_test-7419e3b3f4e14f86f21f9464843f9263638fe7a2.zip
Merge pull request #326 from FOSSEE/improve-code-server
Improve code server
Diffstat (limited to 'yaksh/code_server.py')
-rwxr-xr-x[-rw-r--r--]yaksh/code_server.py274
1 files changed, 162 insertions, 112 deletions
diff --git a/yaksh/code_server.py b/yaksh/code_server.py
index 834c5af..75dd9b2 100644..100755
--- a/yaksh/code_server.py
+++ b/yaksh/code_server.py
@@ -1,59 +1,34 @@
#!/usr/bin/env python
-"""This server runs an HTTP server (using tornado) and several code servers
-using XMLRPC that can be submitted code
-and tests and returns the output. It *should* be run as root and will run as
-the user 'nobody' so as to minimize any damange by errant code. This can be
-configured by editing settings.py to run as many servers as desired. One can
-also specify the ports on the command line. Here are examples::
+"""This server runs an HTTP server (using tornado) to which code can be
+submitted for checking. This is asynchronous so once submitted the user can
+check for the result.
- $ sudo ./code_server.py
- # Runs servers based on settings.py:SERVER_PORTS one server per port given.
-
-or::
-
- $ sudo ./code_server.py 8001 8002 8003 8004 8005
- # Runs 5 servers on ports specified.
-
-All these servers should be running as nobody. This will also start a server
-pool that defaults to port 50000 and is configurable in
-settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function
-that returns an available server.
+It *should* be run as root and will run as the user 'nobody' so as to minimize
+any damange by errant code. This can be configured by editing settings.py to
+run as many servers as desired.
"""
# Standard library imports
from __future__ import unicode_literals
+from argparse import ArgumentParser
import json
-from multiprocessing import Process, Queue
+from multiprocessing import Process, Queue, Manager
import os
-from os.path import isdir, dirname, abspath, join, isfile
+from os.path import dirname, abspath
import pwd
-import re
-import signal
-import stat
-import subprocess
import sys
-
-try:
- from SimpleXMLRPCServer import SimpleXMLRPCServer
-except ImportError:
- # The above import will not work on Python-3.x.
- from xmlrpc.server import SimpleXMLRPCServer
-
-try:
- from urllib import unquote
-except ImportError:
- # The above import will not work on Python-3.x.
- from urllib.parse import unquote
+import time
# Library imports
+import requests
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler
+from six.moves import urllib
# Local imports
-from .settings import SERVER_PORTS, SERVER_POOL_PORT
-from .language_registry import create_evaluator_instance
+from .settings import N_CODE_SERVERS, SERVER_POOL_PORT
from .grader import Grader
@@ -69,70 +44,45 @@ def run_as_nobody():
os.seteuid(nobody.pw_uid)
-###############################################################################
-# `CodeServer` class.
-###############################################################################
-class CodeServer(object):
- """A code server that executes user submitted test code, tests it and
- reports if the code was correct or not.
+def check_code(pid, job_queue, results):
+ """Check the code, this runs forever.
"""
- def __init__(self, port, queue):
- self.port = port
- self.queue = queue
-
- # Public Protocol ##########
- def check_code(self, language, json_data, in_dir=None):
- """Calls relevant EvaluateCode class based on language to check the
- answer code
- """
+ while True:
+ uid, json_data, user_dir = job_queue.get(True)
+ results[uid] = dict(status='running', pid=pid, result=None)
data = json.loads(json_data)
- grader = Grader(in_dir)
- result = grader.evaluate(data)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- return json.dumps(result)
-
- def run(self):
- """Run XMLRPC server, serving our methods."""
- server = SimpleXMLRPCServer(("0.0.0.0", self.port))
- self.server = server
- server.register_instance(self)
- self.queue.put(self.port)
- server.serve_forever()
+ grader = Grader(user_dir)
+ result = grader.evaluate(data)
+ results[uid] = dict(status='done', result=json.dumps(result))
###############################################################################
# `ServerPool` class.
###############################################################################
class ServerPool(object):
- """Manages a pool of CodeServer objects."""
- def __init__(self, ports, pool_port=50000):
- """Create a pool of servers. Uses a shared Queue to get available
- servers.
+ """Manages a pool of processes checking code."""
+ def __init__(self, n, pool_port=50000):
+ """Create a pool of servers.
Parameters
----------
- ports : list(int)
- List of ports at which the CodeServer's should run.
+ n : int
+ Number of code servers to run
pool_port : int
Port at which the server pool should serve.
"""
+ self.n = n
+ self.manager = Manager()
+ self.results = self.manager.dict()
self.my_port = pool_port
- self.ports = ports
- queue = Queue(maxsize=len(self.ports))
- self.queue = queue
- servers = []
+
+ self.job_queue = Queue()
processes = []
- for port in self.ports:
- server = CodeServer(port, queue)
- servers.append(server)
- p = Process(target=server.run)
+ for i in range(n):
+ p = self._make_process(i)
processes.append(p)
- self.servers = servers
self.processes = processes
self.app = self._make_app()
@@ -143,28 +93,56 @@ class ServerPool(object):
app.listen(self.my_port)
return app
+ def _make_process(self, pid):
+ return Process(
+ target=check_code, args=(pid, self.job_queue, self.results)
+ )
+
def _start_code_servers(self):
for proc in self.processes:
if proc.pid is None:
proc.start()
- # Public Protocol ##########
+ def _handle_dead_process(self, result):
+ if result.get('status') == 'running':
+ pid = result.get('pid')
+ proc = self.processes[pid]
+ if not proc.is_alive():
+ # If the processes is dead, something bad happened so
+ # restart that process.
+ new_proc = self._make_process(pid)
+ self.processes[pid] = new_proc
+ new_proc.start()
+ result['status'] = 'done'
+ result['result'] = json.dumps(dict(
+ success=False, weight=0.0,
+ error=['Process ended with exit code %s.'
+ % proc.exitcode]
+ ))
- def get_server_port(self):
- """Get available server port from ones in the pool. This will block
- till it gets an available server.
- """
- return self.queue.get()
+ # Public Protocol ##########
def get_status(self):
- """Returns current queue size and total number of ports used."""
- try:
- qs = self.queue.qsize()
- except NotImplementedError:
- # May not work on OS X so we return a dummy.
- qs = len(self.ports)
-
- return qs, len(self.ports)
+ """Returns current job queue size, total number of processes alive.
+ """
+ qs = sum(r['status'] == 'not started'
+ for r in self.results.values())
+ alive = sum(p.is_alive() for p in self.processes)
+ n_running = sum(r['status'] == 'running'
+ for r in self.results.values())
+
+ return qs, alive, n_running
+
+ def submit(self, uid, json_data, user_dir):
+ self.results[uid] = dict(status='not started')
+ self.job_queue.put((uid, json_data, user_dir))
+
+ def get_result(self, uid):
+ result = self.results.get(uid, dict(status='unknown'))
+ self._handle_dead_process(result)
+ if result.get('status') == 'done':
+ self.results.pop(uid)
+ return json.dumps(result)
def run(self):
"""Run server which returns an available server port where code
@@ -189,30 +167,102 @@ class MainHandler(RequestHandler):
def get(self):
path = self.request.path[1:]
if len(path) == 0:
- port = self.server.get_server_port()
- self.write(str(port))
- elif path == "status":
- q_size, total = self.server.get_status()
- result = "%d servers out of %d are free.\n"%(q_size, total)
- load = float(total - q_size)/total*100
- result += "Load: %s%%\n"%load
+ q_size, alive, running = self.server.get_status()
+ result = "%d processes, %d running, %d queued" % (
+ alive, running, q_size
+ )
self.write(result)
+ else:
+ uid = path
+ json_result = self.server.get_result(uid)
+ self.write(json_result)
+
+ def post(self):
+ uid = self.get_argument('uid')
+ json_data = self.get_argument('json_data')
+ user_dir = self.get_argument('user_dir')
+ self.server.submit(uid, json_data, user_dir)
+ self.write('OK')
+
+
+def submit(url, uid, json_data, user_dir):
+ '''Submit a job to the code server.
+
+ Parameters
+ ----------
+
+ url : str
+ URL of the server pool.
+
+ uid : str
+ Unique ID of the submission.
+
+ json_data : jsonized str
+ Data to send to the code checker.
+
+ user_dir : str
+ User directory.
+ '''
+ requests.post(
+ url, data=dict(uid=uid, json_data=json_data, user_dir=user_dir)
+ )
+
+
+def get_result(url, uid, block=False):
+ '''Get the status of a job submitted to the code server.
+
+ Returns the result currently known in the form of a dict. The dictionary
+ contains two keys, 'status' and 'result'. The status can be one of
+ ['running', 'not started', 'done', 'unknown']. The result is the result of
+ the code execution as a jsonized string.
+
+ Parameters
+ ----------
+
+ url : str
+ URL of the server pool.
+
+ uid : str
+ Unique ID of the submission.
+
+ block : bool
+ Set to True if you wish to block till result is done.
+
+ '''
+ def _get_data():
+ r = requests.get(urllib.parse.urljoin(url, str(uid)))
+ return json.loads(r.content.decode('utf-8'))
+ data = _get_data()
+ if block:
+ while data.get('status') != 'done':
+ time.sleep(0.1)
+ data = _get_data()
+
+ return data
###############################################################################
def main(args=None):
- if args:
- ports = [int(x) for x in args]
- else:
- ports = SERVER_PORTS
-
- server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT)
- # This is done *after* the server pool is created because when the tornado
- # app calls listen(), it cannot be nobody.
+ parser = ArgumentParser(description=__doc__)
+ parser.add_argument(
+ 'n', nargs='?', type=int, default=N_CODE_SERVERS,
+ help="Number of servers to run."
+ )
+ parser.add_argument(
+ '-p', '--port', dest='port', default=SERVER_POOL_PORT,
+ help="Port at which the http server should run."
+ )
+
+ options = parser.parse_args(args)
+
+ # Called before serverpool is created so that the multiprocessing
+ # can work properly.
run_as_nobody()
+ server_pool = ServerPool(n=options.n, pool_port=options.port)
server_pool.run()
+
if __name__ == '__main__':
args = sys.argv[1:]
main(args)