From 9000f58786bc21b05e59ddbe96f8be607f13a00d Mon Sep 17 00:00:00 2001 From: Prabhu Ramachandran Date: Fri, 25 Nov 2011 23:57:56 +0530 Subject: ENH: Fixing bash support, tests for code server. This checkin fixes bash support. In actuality the bash support lets one test any runnable script/program that outputs results to stdout. I've also added a decent test suite for the code server that checks if it functions correctly or not. I've also updated the sample_questions to work with the new bash support and added a reference bash script and the testcode to go with it. --- code_server.py | 167 ++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 137 insertions(+), 30 deletions(-) (limited to 'code_server.py') diff --git a/code_server.py b/code_server.py index 82a0f49..4d1663d 100755 --- a/code_server.py +++ b/code_server.py @@ -24,7 +24,7 @@ from SimpleXMLRPCServer import SimpleXMLRPCServer import pwd import os import stat -from os.path import isdir, dirname, abspath, join +from os.path import isdir, dirname, abspath, join, isfile import signal from multiprocessing import Process, Queue import subprocess @@ -62,6 +62,9 @@ class CodeServer(object): def __init__(self, port, queue): self.port = port self.queue = queue + msg = 'Code took more than %s seconds to run. You probably '\ + 'have an infinite loop in your code.'%SERVER_TIMEOUT + self.timeout_msg = msg def run_python_code(self, answer, test_code, in_dir=None): """Tests given Python function (`answer`) with the `test_code` supplied. @@ -92,7 +95,7 @@ class CodeServer(object): _tests = compile(test_code, '', mode='exec') exec _tests in g except TimeoutException: - err = 'Code took more than %s seconds to run.'%SERVER_TIMEOUT + err = self.timeout_msg except AssertionError: type, value, tb = sys.exc_info() info = traceback.extract_tb(tb) @@ -119,9 +122,16 @@ class CodeServer(object): return success, err def run_bash_code(self, answer, test_code, in_dir=None): + """Tests given Bash code (`answer`) with the `test_code` supplied. - """Tests given Bash code (`answer`) with the `test_code` supplied. It - assumes that there are two parts to the test_code separated by '#++++++'. + The testcode should typically contain two lines, the first is a path to + the reference script we are to compare against. The second is a path + to the arguments to be supplied to the reference and submitted script. + The output of these will be compared for correctness. + + If the path's start with a "/" then we assume they are absolute paths. + If not, we assume they are relative paths w.r.t. the location of this + code_server script. If the optional `in_dir` keyword argument is supplied it changes the directory to that directory (it does not change it back to the original when @@ -137,43 +147,140 @@ class CodeServer(object): os.chdir(in_dir) def _set_exec(fname): - os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR - |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP - |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH) - - # XXX: fix this to not hardcode it to 6 +'s! - reference, args = test_code.split('#++++++') - ref_f = open('reference.sh', 'w') - ref_f.write(reference); ref_f.close() - ref_fname = abspath(ref_f.name) - _set_exec(ref_fname) - args_f = open('reference.args', 'w') - args_f.write(args); args_f.close() - _set_exec(args_f.name) + os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR + |stat.S_IRGRP|stat.S_IWGRP|stat.S_IXGRP + |stat.S_IROTH|stat.S_IWOTH|stat.S_IXOTH) submit_f = open('submit.sh', 'w') - submit_f.write(answer); submit_f.close() - submit_fname = abspath(submit_f.name) - _set_exec(submit_fname) + submit_f.write(answer.lstrip()); submit_f.close() + submit_path = abspath(submit_f.name) + _set_exec(submit_path) + + ref_path, test_case_path = test_code.strip().splitlines() + if not ref_path.startswith('/'): + ref_path = join(MY_DIR, ref_path) + if not test_case_path.startswith('/'): + test_case_path = join(MY_DIR, test_case_path) - tester = join(MY_DIR, 'shell_script_tester.sh') + # Add a new signal handler for the execution of this code. + old_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(SERVER_TIMEOUT) - # Run the shell code in a subprocess. + # Do whatever testing needed. try: - output = subprocess.check_output([tester, ref_fname, submit_fname], - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError, exc: + success, err = self.check_bash_script(ref_path, submit_path, test_case_path) + except TimeoutException: success = False - err = 'Error: exist status: %d, message: %s'%(exc.returncode, - exc.output) - else: - success = True - err = 'Correct answer' + err = self.timeout_msg + finally: + # Set back any original signal handler. + signal.signal(signal.SIGALRM, old_handler) + + # Delete the created file. + os.remove(submit_path) + + # Cancel the signal if any, see signal.alarm documentation. + signal.alarm(0) # Put us back into the server pool queue since we are free now. self.queue.put(self.port) + return success, err + def _run_command(self, cmd_args, *args, **kw): + """Run a command in a subprocess while blocking, the process is killed + if it takes more than 2 seconds to run. Return the Popen object, the + stdout and stderr. + """ + try: + proc = subprocess.Popen(cmd_args, *args, **kw) + stdout, stderr = proc.communicate() + except TimeoutException: + # Runaway code, so kill it. + proc.kill() + # Re-raise exception. + raise + return proc, stdout, stderr + + def check_bash_script(self, ref_script_path, submit_script_path, + test_case_path=None): + """ Function validates student script using instructor script as + reference. Test cases can optionally be provided. The first argument + ref_script_path, is the path to instructor script, it is assumed to + have executable permission. The second argument submit_script_path, is + the path to the student script, it is assumed to have executable + permission. The Third optional argument is the path to test the + scripts. Each line in this file is a test case and each test case is + passed to the script as standard arguments. + + Returns + -------- + + returns (True, "Correct answer") : If the student script passes all test + cases/have same output, when compared to the instructor script + + returns (False, error_msg): If + the student script fails a single test/have dissimilar output, when + compared to the instructor script. + + Returns (False, error_msg): If mandatory arguments are not files or if the + required permissions are not given to the file(s). + + """ + if not isfile(ref_script_path): + return False, "No file at %s"%ref_script_path + if not isfile(submit_script_path): + return False, 'No file at %s'%submit_script_path + if not os.access(ref_script_path, os.X_OK): + return False, 'Script %s is not executable'%ref_script_path + if not os.access(submit_script_path, os.X_OK): + return False, 'Script %s is not executable'%submit_script_path + + if test_case_path is None: + ret = self._run_command(ref_script_path, stdin=None, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc, inst_stdout, inst_stderr = ret + ret = self._run_command(submit_script_path, stdin=None, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc, stdnt_stdout, stdnt_stderr = ret + if inst_stdout == stdnt_stdout: + return True, 'Correct answer' + else: + err = "Error: expected %s, got %s"%(inst_stderr, stdnt_stderr) + return False, err + else: + if not isfile(test_case_path): + return False, "No test case at %s"%test_case_path + if not os.access(ref_script_path, os.R_OK): + return False, "Test script %s, not readable"%test_case_path + valid_answer = True # We initially make it one, so that we can stop + # once a test case fails + loop_count = 0 # Loop count has to be greater than or equal to one. + # Useful for caching things like empty test files,etc. + test_cases = open(test_case_path).readlines() + num_lines = len(test_cases) + for test_case in test_cases: + loop_count += 1 + if valid_answer: + args = [ ref_script_path ] + [x for x in test_case.split()] + ret = self._run_command(args, stdin=None, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc, inst_stdout, inst_stderr = ret + args = [ submit_script_path ] + [x for x in test_case.split()] + ret = self._run_command(args, stdin=None, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc, stdnt_stdout, stdnt_stderr = ret + valid_answer = inst_stdout == stdnt_stdout + if valid_answer and (num_lines == loop_count): + return True, "Correct answer" + else: + err = "Error: expected %s, got %s"%(inst_stdout+inst_stderr, + stdnt_stdout+stdnt_stderr) + return False, err + + def run(self): + """Run XMLRPC server, serving our methods. + """ server = SimpleXMLRPCServer(("localhost", self.port)) self.server = server server.register_instance(self) -- cgit