summaryrefslogtreecommitdiff
path: root/yaksh
diff options
context:
space:
mode:
authorPrabhu Ramachandran2016-08-12 21:01:08 +0530
committerPrabhu Ramachandran2016-08-12 21:01:08 +0530
commitb39eb538c08d4dd6761d6a970aadf58bde58a7ce (patch)
tree6732cb4c0f89ffc80b7b9aa60d54e2d3ff5c70f0 /yaksh
parent3738c8fefa8ac69508bb6daeee045c1f5ea0cb17 (diff)
downloadonline_test-b39eb538c08d4dd6761d6a970aadf58bde58a7ce.tar.gz
online_test-b39eb538c08d4dd6761d6a970aadf58bde58a7ce.tar.bz2
online_test-b39eb538c08d4dd6761d6a970aadf58bde58a7ce.zip
Use a tornado based server for the pool server.
With the previous XMLRPC based server, an XMLRPC server would respond to a request for an available port. This does not work as the server can only take about 2 simultaneous connections. The server pool now uses a HTTP server via tornado which works extremely well. The django code should not change at all as this is an internal change. This change should make the code server far more robust and work for a very large number of simultaneous users. The http server also has a simple status page to indicate the current load. This will not be correct on OSX due to limitations of the multi-processing Queue implementation on OSX.
Diffstat (limited to 'yaksh')
-rwxr-xr-xyaksh/code_server.py114
-rw-r--r--yaksh/tests/test_code_server.py39
-rw-r--r--yaksh/xmlrpc_clients.py21
3 files changed, 112 insertions, 62 deletions
diff --git a/yaksh/code_server.py b/yaksh/code_server.py
index a2cd08a..e19e9c8 100755
--- a/yaksh/code_server.py
+++ b/yaksh/code_server.py
@@ -1,9 +1,11 @@
#!/usr/bin/env python
-"""This server runs an XMLRPC server that can be submitted code and tests
-and returns the output. It *should* be run as root and will run as the user
-'nobody' so as to minimize any damange by errant code. This can be configured
-by editing settings.py to run as many servers as desired. One can also
-specify the ports on the command line. Here are examples::
+
+"""This server runs an HTTP server (using tornado) and several code servers
+using XMLRPC that can be submitted code
+and tests and returns the output. It *should* be run as root and will run as
+the user 'nobody' so as to minimize any damange by errant code. This can be
+configured by editing settings.py to run as many servers as desired. One can
+also specify the ports on the command line. Here are examples::
$ sudo ./code_server.py
# Runs servers based on settings.py:SERVER_PORTS one server per port given.
@@ -17,18 +19,32 @@ All these servers should be running as nobody. This will also start a server
pool that defaults to port 50000 and is configurable in
settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function
that returns an available server.
+
"""
-import sys
+
+# Standard library imports
from SimpleXMLRPCServer import SimpleXMLRPCServer
-import pwd
+import json
+from multiprocessing import Process, Queue
import os
-import stat
from os.path import isdir, dirname, abspath, join, isfile
+import pwd
+import re
import signal
-from multiprocessing import Process, Queue
+import stat
import subprocess
-import re
-import json
+import sys
+
+try:
+ from urllib import unquote
+except ImportError:
+ # The above import will not work on Python-3.x.
+ from urllib.parse import unquote
+
+# Library imports
+from tornado.ioloop import IOLoop
+from tornado.web import Application, RequestHandler
+
# Local imports.
from settings import SERVER_PORTS, SERVER_POOL_PORT
from language_registry import create_evaluator_instance, unpack_json
@@ -104,17 +120,30 @@ class ServerPool(object):
"""
self.my_port = pool_port
self.ports = ports
- queue = Queue(maxsize=len(ports))
+ queue = Queue(maxsize=len(self.ports))
self.queue = queue
servers = []
- self.processes = []
- for port in ports:
+ processes = []
+ for port in self.ports:
server = CodeServer(port, queue)
servers.append(server)
p = Process(target=server.run)
- p.start()
- self.processes.append(p)
+ processes.append(p)
self.servers = servers
+ self.processes = processes
+ self.app = self._make_app()
+
+ def _make_app(self):
+ app = Application([
+ (r"/.*", MainHandler, dict(server=self)),
+ ])
+ app.listen(self.my_port)
+ return app
+
+ def _start_code_servers(self):
+ for proc in self.processes:
+ if proc.pid is None:
+ proc.start()
# Public Protocol ##########
@@ -122,42 +151,63 @@ class ServerPool(object):
"""Get available server port from ones in the pool. This will block
till it gets an available server.
"""
- q = self.queue
- was_waiting = True if q.empty() else False
- port = q.get()
- if was_waiting:
- print '*'*80
- print "No available servers, was waiting but got server \
- later at %d." % port
- print '*'*80
- sys.stdout.flush()
- return port
+ return self.queue.get()
+
+ def get_status(self):
+ """Returns current queue size and total number of ports used."""
+ try:
+ qs = self.queue.qsize()
+ except NotImplementedError:
+ # May not work on OS X so we return a dummy.
+ qs = len(self.ports)
+
+ return qs, len(self.ports)
def run(self):
"""Run server which returns an available server port where code
can be executed.
"""
- server = SimpleXMLRPCServer(("0.0.0.0", self.my_port))
- self.server = server
- server.register_instance(self)
- server.serve_forever()
+ # We start the code servers here to ensure they are run as nobody.
+ self._start_code_servers()
+ IOLoop.current().start()
def stop(self):
"""Stop all the code server processes.
"""
for proc in self.processes:
proc.terminate()
+ IOLoop.current().stop()
+
+
+class MainHandler(RequestHandler):
+ def initialize(self, server):
+ self.server = server
+
+ def get(self):
+ path = self.request.path[1:]
+ if len(path) == 0:
+ port = self.server.get_server_port()
+ self.write(str(port))
+ elif path == "status":
+ q_size, total = self.server.get_status()
+ result = "%d servers out of %d are free.\n"%(q_size, total)
+ load = float(total - q_size)/total*100
+ result += "Load: %s%%\n"%load
+ self.write(result)
###############################################################################
def main(args=None):
- run_as_nobody()
if args:
- ports = [int(x) for x in args[1:]]
+ ports = [int(x) for x in args]
else:
ports = SERVER_PORTS
server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT)
+ # This is done *after* the server pool is created because when the tornado
+ # app calls listen(), it cannot be nobody.
+ run_as_nobody()
+
server_pool.run()
if __name__ == '__main__':
diff --git a/yaksh/tests/test_code_server.py b/yaksh/tests/test_code_server.py
index 18510c6..a73f073 100644
--- a/yaksh/tests/test_code_server.py
+++ b/yaksh/tests/test_code_server.py
@@ -1,17 +1,15 @@
import json
-from multiprocessing import Process
try:
from Queue import Queue
except ImportError:
from queue import Queue
from threading import Thread
import unittest
-
+import urllib
from yaksh.code_server import ServerPool, SERVER_POOL_PORT
-
from yaksh import settings
-from yaksh.xmlrpc_clients import code_server
+from yaksh.xmlrpc_clients import CodeServerProxy
class TestCodeServer(unittest.TestCase):
@@ -23,24 +21,26 @@ class TestCodeServer(unittest.TestCase):
ports = range(8001, 8006)
server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT)
cls.server_pool = server_pool
- cls.server_proc = p = Process(target=server_pool.run)
- p.start()
-
+ cls.server_thread = t = Thread(target=server_pool.run)
+ t.start()
@classmethod
def tearDownClass(cls):
cls.server_pool.stop()
- cls.server_proc.terminate()
+ cls.server_thread.join()
settings.code_evaluators['python']['standardtestcase'] = \
"python_assertion_evaluator.PythonAssertionEvaluator"
+ def setUp(self):
+ self.code_server = CodeServerProxy()
+
def test_inifinite_loop(self):
# Given
testdata = {'user_answer': 'while True: pass',
'test_case_data': [{'test_case':'assert 1==2'}]}
# When
- result = code_server.run_code(
+ result = self.code_server.run_code(
'python', 'standardtestcase', json.dumps(testdata), ''
)
@@ -55,7 +55,7 @@ class TestCodeServer(unittest.TestCase):
'test_case_data': [{'test_case':'assert f() == 1'}]}
# When
- result = code_server.run_code(
+ result = self.code_server.run_code(
'python', 'standardtestcase', json.dumps(testdata), ''
)
@@ -70,7 +70,7 @@ class TestCodeServer(unittest.TestCase):
'test_case_data': [{'test_case':'assert f() == 2'}]}
# When
- result = code_server.run_code(
+ result = self.code_server.run_code(
'python', 'standardtestcase', json.dumps(testdata), ''
)
@@ -87,12 +87,12 @@ class TestCodeServer(unittest.TestCase):
"""Run an infinite loop."""
testdata = {'user_answer': 'while True: pass',
'test_case_data': [{'test_case':'assert 1==2'}]}
- result = code_server.run_code(
+ result = self.code_server.run_code(
'python', 'standardtestcase', json.dumps(testdata), ''
)
results.put(json.loads(result))
- N = 5
+ N = 10
# When
import time
threads = []
@@ -112,6 +112,19 @@ class TestCodeServer(unittest.TestCase):
self.assertFalse(data['success'])
self.assertTrue('infinite loop' in data['error'])
+ def test_server_pool_status(self):
+ # Given
+ url = "http://localhost:%s/status"%SERVER_POOL_PORT
+
+ # When
+ data = urllib.urlopen(url).read()
+
+ # Then
+ expect = 'out of 5 are free'
+ self.assertTrue(expect in data)
+ expect = 'Load:'
+ self.assertTrue(expect in data)
+
if __name__ == '__main__':
unittest.main()
diff --git a/yaksh/xmlrpc_clients.py b/yaksh/xmlrpc_clients.py
index 7124550..6bfe0d6 100644
--- a/yaksh/xmlrpc_clients.py
+++ b/yaksh/xmlrpc_clients.py
@@ -3,6 +3,7 @@ import time
import random
import socket
import json
+import urllib
from settings import SERVER_PORTS, SERVER_POOL_PORT
@@ -21,7 +22,7 @@ class CodeServerProxy(object):
"""
def __init__(self):
pool_url = 'http://localhost:%d' % (SERVER_POOL_PORT)
- self.pool_server = ServerProxy(pool_url)
+ self.pool_url = pool_url
def run_code(self, language, test_case_type, json_data, user_dir):
"""Tests given code (`answer`) with the `test_code` supplied. If the
@@ -34,7 +35,7 @@ class CodeServerProxy(object):
----------
json_data contains;
user_answer : str
- The user's answer for the question.
+ The user's answer for the question.
test_code : str
The test code to check the user code with.
language : str
@@ -57,21 +58,7 @@ class CodeServerProxy(object):
return result
def _get_server(self):
- # Get a suitable server from our pool of servers. This may block. We
- # try about 60 times, essentially waiting at most for about 30 seconds.
- done, count = False, 60
-
- while not done and count > 0:
- try:
- port = self.pool_server.get_server_port()
- except socket.error:
- # Wait a while try again.
- time.sleep(random.random())
- count -= 1
- else:
- done = True
- if not done:
- raise ConnectionError("Couldn't connect to a server!")
+ port = json.loads(urllib.urlopen(self.pool_url).read())
proxy = ServerProxy('http://localhost:%d' % port)
return proxy