summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrabhu Ramachandran2017-08-04 16:24:38 +0530
committerPrabhu Ramachandran2017-08-04 16:24:38 +0530
commitaf06b0d65c1b3ae1f04be1d18c155677e7bf7922 (patch)
treed081a14e13592d51930fb0a593c9b04faa4663f8
parent08eaf36d6345212a580a2ccddcdfae7bea85191d (diff)
downloadonline_test-af06b0d65c1b3ae1f04be1d18c155677e7bf7922.tar.gz
online_test-af06b0d65c1b3ae1f04be1d18c155677e7bf7922.tar.bz2
online_test-af06b0d65c1b3ae1f04be1d18c155677e7bf7922.zip
Completely rewrite the server pool.
It is now much simpler. There are no xmlrpc servers anymore instead the tornado server takes a post request to submit a job asynchronously and the results are added to a shared dictionary. A get request can be used to check the status of a submitted job. This allows the submission and checking of code to be completely async and will make the application a lot more scalable.
-rw-r--r--yaksh/code_server.py246
-rw-r--r--yaksh/settings.py6
-rw-r--r--yaksh/tests/test_code_server.py131
3 files changed, 206 insertions, 177 deletions
diff --git a/yaksh/code_server.py b/yaksh/code_server.py
index 834c5af..6acce74 100644
--- a/yaksh/code_server.py
+++ b/yaksh/code_server.py
@@ -1,59 +1,34 @@
#!/usr/bin/env python
-"""This server runs an HTTP server (using tornado) and several code servers
-using XMLRPC that can be submitted code
-and tests and returns the output. It *should* be run as root and will run as
-the user 'nobody' so as to minimize any damange by errant code. This can be
-configured by editing settings.py to run as many servers as desired. One can
-also specify the ports on the command line. Here are examples::
+"""This server runs an HTTP server (using tornado) to which code can be
+submitted for checking. This is asynchronous so once submitted the user can
+check for the result.
- $ sudo ./code_server.py
- # Runs servers based on settings.py:SERVER_PORTS one server per port given.
-
-or::
-
- $ sudo ./code_server.py 8001 8002 8003 8004 8005
- # Runs 5 servers on ports specified.
-
-All these servers should be running as nobody. This will also start a server
-pool that defaults to port 50000 and is configurable in
-settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function
-that returns an available server.
+It *should* be run as root and will run as the user 'nobody' so as to minimize
+any damange by errant code. This can be configured by editing settings.py to
+run as many servers as desired.
"""
# Standard library imports
from __future__ import unicode_literals
+from argparse import ArgumentParser
import json
-from multiprocessing import Process, Queue
+from multiprocessing import Process, Queue, Manager
import os
-from os.path import isdir, dirname, abspath, join, isfile
+from os.path import dirname, abspath
import pwd
-import re
-import signal
-import stat
-import subprocess
import sys
-
-try:
- from SimpleXMLRPCServer import SimpleXMLRPCServer
-except ImportError:
- # The above import will not work on Python-3.x.
- from xmlrpc.server import SimpleXMLRPCServer
-
-try:
- from urllib import unquote
-except ImportError:
- # The above import will not work on Python-3.x.
- from urllib.parse import unquote
+import time
# Library imports
+import requests
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler
+from six.moves import urllib
# Local imports
-from .settings import SERVER_PORTS, SERVER_POOL_PORT
-from .language_registry import create_evaluator_instance
+from .settings import N_CODE_SERVERS, SERVER_POOL_PORT
from .grader import Grader
@@ -69,70 +44,45 @@ def run_as_nobody():
os.seteuid(nobody.pw_uid)
-###############################################################################
-# `CodeServer` class.
-###############################################################################
-class CodeServer(object):
- """A code server that executes user submitted test code, tests it and
- reports if the code was correct or not.
+def check_code(job_queue, results):
+ """Check the code, this runs forever.
"""
- def __init__(self, port, queue):
- self.port = port
- self.queue = queue
-
- # Public Protocol ##########
- def check_code(self, language, json_data, in_dir=None):
- """Calls relevant EvaluateCode class based on language to check the
- answer code
- """
+ while True:
+ uid, json_data, user_dir = job_queue.get(True)
+ results[uid] = dict(status='running', result=None)
data = json.loads(json_data)
- grader = Grader(in_dir)
- result = grader.evaluate(data)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- return json.dumps(result)
-
- def run(self):
- """Run XMLRPC server, serving our methods."""
- server = SimpleXMLRPCServer(("0.0.0.0", self.port))
- self.server = server
- server.register_instance(self)
- self.queue.put(self.port)
- server.serve_forever()
+ grader = Grader(user_dir)
+ result = grader.evaluate(data)
+ results[uid] = dict(status='done', result=json.dumps(result))
###############################################################################
# `ServerPool` class.
###############################################################################
class ServerPool(object):
- """Manages a pool of CodeServer objects."""
- def __init__(self, ports, pool_port=50000):
- """Create a pool of servers. Uses a shared Queue to get available
- servers.
+ """Manages a pool of processes checking code."""
+ def __init__(self, n, pool_port=50000):
+ """Create a pool of servers.
Parameters
----------
- ports : list(int)
- List of ports at which the CodeServer's should run.
+ n : int
+ Number of code servers to run
pool_port : int
Port at which the server pool should serve.
"""
+ self.n = n
+ self.manager = Manager()
+ self.results = self.manager.dict()
self.my_port = pool_port
- self.ports = ports
- queue = Queue(maxsize=len(self.ports))
- self.queue = queue
- servers = []
+
+ self.job_queue = Queue()
processes = []
- for port in self.ports:
- server = CodeServer(port, queue)
- servers.append(server)
- p = Process(target=server.run)
+ for i in range(n):
+ p = Process(target=check_code, args=(self.job_queue, self.results))
processes.append(p)
- self.servers = servers
self.processes = processes
self.app = self._make_app()
@@ -150,21 +100,26 @@ class ServerPool(object):
# Public Protocol ##########
- def get_server_port(self):
- """Get available server port from ones in the pool. This will block
- till it gets an available server.
- """
- return self.queue.get()
-
def get_status(self):
- """Returns current queue size and total number of ports used."""
- try:
- qs = self.queue.qsize()
- except NotImplementedError:
- # May not work on OS X so we return a dummy.
- qs = len(self.ports)
-
- return qs, len(self.ports)
+ """Returns current job queue size, total number of processes alive.
+ """
+ qs = sum(r['status'] == 'not started'
+ for r in self.results.values())
+ alive = sum(p.is_alive() for p in self.processes)
+ n_running = sum(r['status'] == 'running'
+ for r in self.results.values())
+
+ return qs, alive, n_running
+
+ def submit(self, uid, json_data, user_dir):
+ self.results[uid] = dict(status='not started')
+ self.job_queue.put((uid, json_data, user_dir))
+
+ def get_result(self, uid):
+ result = self.results.get(uid, dict(status='unknown'))
+ if result.get('status') == 'done':
+ self.results.pop(uid)
+ return json.dumps(result)
def run(self):
"""Run server which returns an available server port where code
@@ -189,24 +144,95 @@ class MainHandler(RequestHandler):
def get(self):
path = self.request.path[1:]
if len(path) == 0:
- port = self.server.get_server_port()
- self.write(str(port))
- elif path == "status":
- q_size, total = self.server.get_status()
- result = "%d servers out of %d are free.\n"%(q_size, total)
- load = float(total - q_size)/total*100
- result += "Load: %s%%\n"%load
+ q_size, alive, running = self.server.get_status()
+ result = "%d processes, %d running, %d queued" % (
+ alive, running, q_size
+ )
self.write(result)
+ else:
+ uid = path
+ json_result = self.server.get_result(uid)
+ self.write(json_result)
+
+ def post(self):
+ uid = self.get_argument('uid')
+ json_data = self.get_argument('json_data')
+ user_dir = self.get_argument('user_dir')
+ self.server.submit(uid, json_data, user_dir)
+ self.write('OK')
+
+
+def submit(url, uid, json_data, user_dir):
+ '''Submit a job to the code server.
+
+ Parameters
+ ----------
+
+ url : str
+ URL of the server pool.
+
+ uid : str
+ Unique ID of the submission.
+
+ json_data : jsonized str
+ Data to send to the code checker.
+
+ user_dir : str
+ User directory.
+ '''
+ requests.post(
+ url, data=dict(uid=uid, json_data=json_data, user_dir=user_dir)
+ )
+
+
+def get_result(url, uid, block=False):
+ '''Get the status of a job submitted to the code server.
+
+ Returns the result currently known in the form of a dict. The dictionary
+ contains two keys, 'status' and 'result'. The status can be one of
+ ['running', 'not started', 'done', 'unknown']. The result is the result of
+ the code execution as a jsonized string.
+
+ Parameters
+ ----------
+
+ url : str
+ URL of the server pool.
+
+ uid : str
+ Unique ID of the submission.
+
+ block : bool
+ Set to True if you wish to block till result is done.
+
+ '''
+ def _get_data():
+ r = requests.get(urllib.parse.urljoin(url, str(uid)))
+ return json.loads(r.content)
+ data = _get_data()
+ if block:
+ while data.get('status') != 'done':
+ time.sleep(0.1)
+ data = _get_data()
+
+ return data
###############################################################################
def main(args=None):
- if args:
- ports = [int(x) for x in args]
- else:
- ports = SERVER_PORTS
-
- server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT)
+ parser = ArgumentParser(description=__doc__)
+ parser.add_argument(
+ 'n', nargs='?', type=int, default=N_CODE_SERVERS,
+ help="Number of servers to run."
+ )
+ parser.add_argument(
+ '-p', '--port', dest='port', default=SERVER_POOL_PORT,
+ help="Port at which the http server should run."
+ )
+
+ options = parser.parse_args(args)
+
+ server_pool = ServerPool(n=options.n, pool_port=options.port)
# This is done *after* the server pool is created because when the tornado
# app calls listen(), it cannot be nobody.
run_as_nobody()
diff --git a/yaksh/settings.py b/yaksh/settings.py
index 72f9fda..d500d93 100644
--- a/yaksh/settings.py
+++ b/yaksh/settings.py
@@ -1,9 +1,9 @@
"""
settings for yaksh app.
"""
-# The ports the code server should run on. This will run one separate
-# server for each port listed in the following list.
-SERVER_PORTS = [8001] # range(8001, 8026)
+
+# The number of code server processes to run..
+N_CODE_SERVERS = 5
# The server pool port. This is the server which returns available server
# ports so as to minimize load. This is some random number where no other
diff --git a/yaksh/tests/test_code_server.py b/yaksh/tests/test_code_server.py
index 47c1da7..a73c12f 100644
--- a/yaksh/tests/test_code_server.py
+++ b/yaksh/tests/test_code_server.py
@@ -8,9 +8,8 @@ from threading import Thread
import unittest
from six.moves import urllib
-from yaksh.code_server import ServerPool, SERVER_POOL_PORT
+from yaksh.code_server import ServerPool, SERVER_POOL_PORT, submit, get_result
from yaksh import settings
-from yaksh.xmlrpc_clients import CodeServerProxy
class TestCodeServer(unittest.TestCase):
@@ -19,8 +18,7 @@ class TestCodeServer(unittest.TestCase):
def setUpClass(cls):
settings.code_evaluators['python']['standardtestcase'] = \
"yaksh.python_assertion_evaluator.PythonAssertionEvaluator"
- ports = range(8001, 8006)
- server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT)
+ server_pool = ServerPool(n=5, pool_port=SERVER_POOL_PORT)
cls.server_pool = server_pool
cls.server_thread = t = Thread(target=server_pool.run)
t.start()
@@ -33,70 +31,78 @@ class TestCodeServer(unittest.TestCase):
"python_assertion_evaluator.PythonAssertionEvaluator"
def setUp(self):
- self.code_server = CodeServerProxy()
+ self.url = 'http://localhost:%s' % SERVER_POOL_PORT
def test_infinite_loop(self):
# Given
- testdata = {'metadata': {'user_answer': 'while True: pass',
- 'language': 'python',
- 'partial_grading': False
- },
- 'test_case_data': [{'test_case':'assert 1==2',
- 'test_case_type': 'standardtestcase',
- 'weight': 0.0
- }]
- }
+ testdata = {
+ 'metadata': {
+ 'user_answer': 'while True: pass',
+ 'language': 'python',
+ 'partial_grading': False
+ },
+ 'test_case_data': [
+ {'test_case': 'assert 1==2',
+ 'test_case_type': 'standardtestcase',
+ 'weight': 0.0}
+ ]
+ }
# When
- result = self.code_server.run_code(
- 'python', json.dumps(testdata), ''
- )
+ submit(self.url, '0', json.dumps(testdata), '')
+ result = get_result(self.url, '0')
# Then
- data = json.loads(result)
+ self.assertTrue(result.get('status') in ['running', 'not started'])
+
+ # When
+ result = get_result(self.url, '0', block=True)
+
+ # Then
+ data = json.loads(result.get('result'))
self.assertFalse(data['success'])
self.assertTrue('infinite loop' in data['error'][0])
def test_correct_answer(self):
# Given
- testdata = {'metadata': { 'user_answer': 'def f(): return 1',
- 'language': 'python',
- 'partial_grading': False
- },
- 'test_case_data': [{'test_case':'assert f() == 1',
- 'test_case_type': 'standardtestcase',
- 'weight': 0.0
- }]
- }
+ testdata = {
+ 'metadata': {
+ 'user_answer': 'def f(): return 1',
+ 'language': 'python',
+ 'partial_grading': False
+ },
+ 'test_case_data': [{'test_case': 'assert f() == 1',
+ 'test_case_type': 'standardtestcase',
+ 'weight': 0.0}]
+ }
# When
- result = self.code_server.run_code(
- 'python', json.dumps(testdata), ''
- )
+ submit(self.url, '0', json.dumps(testdata), '')
+ result = get_result(self.url, '0', block=True)
# Then
- data = json.loads(result)
+ data = json.loads(result.get('result'))
self.assertTrue(data['success'])
def test_wrong_answer(self):
# Given
- testdata = {'metadata': { 'user_answer': 'def f(): return 1',
- 'language': 'python',
- 'partial_grading': False
- },
- 'test_case_data': [{'test_case':'assert f() == 2',
- 'test_case_type': 'standardtestcase',
- 'weight': 0.0
- }]
- }
+ testdata = {
+ 'metadata': {
+ 'user_answer': 'def f(): return 1',
+ 'language': 'python',
+ 'partial_grading': False
+ },
+ 'test_case_data': [{'test_case': 'assert f() == 2',
+ 'test_case_type': 'standardtestcase',
+ 'weight': 0.0}]
+ }
# When
- result = self.code_server.run_code(
- 'python', json.dumps(testdata), ''
- )
+ submit(self.url, '0', json.dumps(testdata), '')
+ result = get_result(self.url, '0', block=True)
# Then
- data = json.loads(result)
+ data = json.loads(result.get('result'))
self.assertFalse(data['success'])
self.assertTrue('AssertionError' in data['error'][0])
@@ -104,28 +110,27 @@ class TestCodeServer(unittest.TestCase):
# Given
results = Queue()
- def run_code():
+ def run_code(uid):
"""Run an infinite loop."""
- testdata = {'metadata': { 'user_answer': 'while True: pass',
- 'language': 'python',
- 'partial_grading': False
- },
- 'test_case_data': [{'test_case':'assert 1==2',
- 'test_case_type': 'standardtestcase',
- 'weight': 0.0
- }]
- }
- result = self.code_server.run_code(
- 'python', json.dumps(testdata), ''
- )
- results.put(json.loads(result))
+ testdata = {
+ 'metadata': {
+ 'user_answer': 'while True: pass',
+ 'language': 'python',
+ 'partial_grading': False
+ },
+ 'test_case_data': [{'test_case': 'assert 1==2',
+ 'test_case_type': 'standardtestcase',
+ 'weight': 0.0}]
+ }
+ submit(self.url, uid, json.dumps(testdata), '')
+ result = get_result(self.url, uid, block=True)
+ results.put(json.loads(result.get('result')))
N = 10
# When
- import time
threads = []
for i in range(N):
- t = Thread(target=run_code)
+ t = Thread(target=run_code, args=(str(i),))
threads.append(t)
t.start()
@@ -142,16 +147,14 @@ class TestCodeServer(unittest.TestCase):
def test_server_pool_status(self):
# Given
- url = "http://localhost:%s/status"%SERVER_POOL_PORT
+ url = "http://localhost:%s/" % SERVER_POOL_PORT
# When
response = urllib.request.urlopen(url)
data = response.read().decode('utf-8')
# Then
- expect = 'out of 5 are free'
- self.assertTrue(expect in data)
- expect = 'Load:'
+ expect = '5 processes, 0 running, 0 queued'
self.assertTrue(expect in data)