From 3738c8fefa8ac69508bb6daeee045c1f5ea0cb17 Mon Sep 17 00:00:00 2001 From: Prabhu Ramachandran Date: Fri, 12 Aug 2016 16:57:03 +0530 Subject: Add test case for code server. This currently fails when multiple threads ask for a code evaluation at the same time. --- yaksh/code_server.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'yaksh/code_server.py') diff --git a/yaksh/code_server.py b/yaksh/code_server.py index 2d8567e..a2cd08a 100755 --- a/yaksh/code_server.py +++ b/yaksh/code_server.py @@ -62,7 +62,7 @@ class CodeServer(object): """Calls relevant EvaluateCode class based on language to check the answer code """ - code_evaluator = create_evaluator_instance(language, + code_evaluator = create_evaluator_instance(language, test_case_type, json_data, in_dir @@ -107,11 +107,13 @@ class ServerPool(object): queue = Queue(maxsize=len(ports)) self.queue = queue servers = [] + self.processes = [] for port in ports: server = CodeServer(port, queue) servers.append(server) p = Process(target=server.run) p.start() + self.processes.append(p) self.servers = servers # Public Protocol ########## @@ -140,6 +142,12 @@ class ServerPool(object): server.register_instance(self) server.serve_forever() + def stop(self): + """Stop all the code server processes. + """ + for proc in self.processes: + proc.terminate() + ############################################################################### def main(args=None): -- cgit From b39eb538c08d4dd6761d6a970aadf58bde58a7ce Mon Sep 17 00:00:00 2001 From: Prabhu Ramachandran Date: Fri, 12 Aug 2016 21:01:08 +0530 Subject: Use a tornado based server for the pool server. With the previous XMLRPC based server, an XMLRPC server would respond to a request for an available port. This does not work as the server can only take about 2 simultaneous connections. The server pool now uses a HTTP server via tornado which works extremely well. The django code should not change at all as this is an internal change. This change should make the code server far more robust and work for a very large number of simultaneous users. The http server also has a simple status page to indicate the current load. This will not be correct on OSX due to limitations of the multi-processing Queue implementation on OSX. --- yaksh/code_server.py | 114 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 82 insertions(+), 32 deletions(-) (limited to 'yaksh/code_server.py') diff --git a/yaksh/code_server.py b/yaksh/code_server.py index a2cd08a..e19e9c8 100755 --- a/yaksh/code_server.py +++ b/yaksh/code_server.py @@ -1,9 +1,11 @@ #!/usr/bin/env python -"""This server runs an XMLRPC server that can be submitted code and tests -and returns the output. It *should* be run as root and will run as the user -'nobody' so as to minimize any damange by errant code. This can be configured -by editing settings.py to run as many servers as desired. One can also -specify the ports on the command line. Here are examples:: + +"""This server runs an HTTP server (using tornado) and several code servers +using XMLRPC that can be submitted code +and tests and returns the output. It *should* be run as root and will run as +the user 'nobody' so as to minimize any damange by errant code. This can be +configured by editing settings.py to run as many servers as desired. One can +also specify the ports on the command line. Here are examples:: $ sudo ./code_server.py # Runs servers based on settings.py:SERVER_PORTS one server per port given. @@ -17,18 +19,32 @@ All these servers should be running as nobody. This will also start a server pool that defaults to port 50000 and is configurable in settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function that returns an available server. + """ -import sys + +# Standard library imports from SimpleXMLRPCServer import SimpleXMLRPCServer -import pwd +import json +from multiprocessing import Process, Queue import os -import stat from os.path import isdir, dirname, abspath, join, isfile +import pwd +import re import signal -from multiprocessing import Process, Queue +import stat import subprocess -import re -import json +import sys + +try: + from urllib import unquote +except ImportError: + # The above import will not work on Python-3.x. + from urllib.parse import unquote + +# Library imports +from tornado.ioloop import IOLoop +from tornado.web import Application, RequestHandler + # Local imports. from settings import SERVER_PORTS, SERVER_POOL_PORT from language_registry import create_evaluator_instance, unpack_json @@ -104,17 +120,30 @@ class ServerPool(object): """ self.my_port = pool_port self.ports = ports - queue = Queue(maxsize=len(ports)) + queue = Queue(maxsize=len(self.ports)) self.queue = queue servers = [] - self.processes = [] - for port in ports: + processes = [] + for port in self.ports: server = CodeServer(port, queue) servers.append(server) p = Process(target=server.run) - p.start() - self.processes.append(p) + processes.append(p) self.servers = servers + self.processes = processes + self.app = self._make_app() + + def _make_app(self): + app = Application([ + (r"/.*", MainHandler, dict(server=self)), + ]) + app.listen(self.my_port) + return app + + def _start_code_servers(self): + for proc in self.processes: + if proc.pid is None: + proc.start() # Public Protocol ########## @@ -122,42 +151,63 @@ class ServerPool(object): """Get available server port from ones in the pool. This will block till it gets an available server. """ - q = self.queue - was_waiting = True if q.empty() else False - port = q.get() - if was_waiting: - print '*'*80 - print "No available servers, was waiting but got server \ - later at %d." % port - print '*'*80 - sys.stdout.flush() - return port + return self.queue.get() + + def get_status(self): + """Returns current queue size and total number of ports used.""" + try: + qs = self.queue.qsize() + except NotImplementedError: + # May not work on OS X so we return a dummy. + qs = len(self.ports) + + return qs, len(self.ports) def run(self): """Run server which returns an available server port where code can be executed. """ - server = SimpleXMLRPCServer(("0.0.0.0", self.my_port)) - self.server = server - server.register_instance(self) - server.serve_forever() + # We start the code servers here to ensure they are run as nobody. + self._start_code_servers() + IOLoop.current().start() def stop(self): """Stop all the code server processes. """ for proc in self.processes: proc.terminate() + IOLoop.current().stop() + + +class MainHandler(RequestHandler): + def initialize(self, server): + self.server = server + + def get(self): + path = self.request.path[1:] + if len(path) == 0: + port = self.server.get_server_port() + self.write(str(port)) + elif path == "status": + q_size, total = self.server.get_status() + result = "%d servers out of %d are free.\n"%(q_size, total) + load = float(total - q_size)/total*100 + result += "Load: %s%%\n"%load + self.write(result) ############################################################################### def main(args=None): - run_as_nobody() if args: - ports = [int(x) for x in args[1:]] + ports = [int(x) for x in args] else: ports = SERVER_PORTS server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT) + # This is done *after* the server pool is created because when the tornado + # app calls listen(), it cannot be nobody. + run_as_nobody() + server_pool.run() if __name__ == '__main__': -- cgit