diff options
author | ankitjavalkar | 2015-03-05 12:27:02 +0530 |
---|---|---|
committer | ankitjavalkar | 2015-04-26 19:43:19 +0530 |
commit | 9440bff5ae69c1d27f5c9622ca15cb8c603c6174 (patch) | |
tree | 761e4c37f02244232ce876233e026f1c0e894ad6 /testapp/exam/code_server.py | |
parent | 8e2469e937fd4f80ebf2053d6e21c9b670d38ea2 (diff) | |
download | online_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.tar.gz online_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.tar.bz2 online_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.zip |
Code Server code cleanup and code commonification
- Pass question and test case info as json string (info_parameter)
- Return success status and error message as a json string
- Embed user answer and question lang in info_parameter
- Commonify Python code evaluations and assertion test
- Deprecate individual function call based on language
Diffstat (limited to 'testapp/exam/code_server.py')
-rwxr-xr-x | testapp/exam/code_server.py | 1521 |
1 files changed, 870 insertions, 651 deletions
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py index c34971f..fa5d916 100755 --- a/testapp/exam/code_server.py +++ b/testapp/exam/code_server.py @@ -29,6 +29,7 @@ import signal from multiprocessing import Process, Queue import subprocess import re +import json # Local imports. from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT @@ -53,6 +54,131 @@ def timeout_handler(signum, frame): """A handler for the ALARM signal.""" raise TimeoutException('Code took too long to run.') +def create_delete_signal_handler(action=None, old_handler=None): + if action == "new": # Add a new signal handler for the execution of this code. + prev_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(SERVER_TIMEOUT) + return prev_handler + elif action == "original": # Set back any original signal handler. + if old_handler is not None: + signal.signal(signal.SIGALRM, old_handler) + return + else: + raise Exception("Signal Handler: object cannot be NoneType") + elif action == "delete": # Cancel the signal if any, see signal.alarm documentation. + signal.alarm(0) + return + else: + raise Exception("Signal Handler: action not specified") + + +############################################################################### +# `TestCode` class. +############################################################################### +class TestCode(object): + """Evaluates and tests the code obtained from Code Server""" + def __init__(self, info_parameter, in_dir=None): + info_parameter = json.loads(info_parameter) + self.test_parameter = info_parameter.get("test_parameter") + self.language = info_parameter.get("language") + self.user_answer = info_parameter.get("user_answer") + self.in_dir = in_dir + + def run_code(self): + self._change_dir(self.in_dir) + + # Create test cases + test_code = self.create_test_case() + # Evaluate, run code and obtain result + result = self.evaluate_code(test_code) + + return result + + def evaluate_code(self, test_code): + success = False + prev_handler = None + + if self.language == "python": + tb = None + prev_handler = create_delete_signal_handler("new") + + try: + submitted = compile(self.user_answer, '<string>', mode='exec') + g = {} + exec submitted in g + _tests = compile(test_code, '<string>', mode='exec') + exec _tests in g + except TimeoutException: + err = self.timeout_msg + except AssertionError: + type, value, tb = sys.exc_info() + info = traceback.extract_tb(tb) + fname, lineno, func, text = info[-1] + text = str(test_code).splitlines()[lineno-1] + err = "{0} {1} in: {2}".format(type.__name__, str(value), text) + except: + type, value = sys.exc_info()[:2] + err = "Error: {0}".format(repr(value)) + else: + success = True + err = 'Correct answer' + finally: + del tb + # Set back any original signal handler. + create_delete_signal_handler("original", prev_handler) + + # Cancel the signal + create_delete_signal_handler("delete") + + if self.language == "c++": + pass + + if self.language == "c": + pass + + if self.language == "java": + pass + + if self.language == "scilab": + pass + + result = {'success': success, 'error': err} + return result + + def compile_code(self): + pass + + def create_test_case(self): + # Create assert based test cases in python + if self.language == "python": + test_code = "" + for test_case in self.test_parameter: + pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \ + else "" + kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \ + if test_case.get('kw_args') else "" + args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args + tcode = "assert {0}({1}) == {2}" \ + .format(test_case.get('func_name'), args, test_case.get('expected_answer')) + test_code += tcode + "\n" + return test_code + + if self.language == "c++": + pass + + if self.language == "c": + pass + + if self.language == "java": + pass + + if self.language == "scilab": + pass + + def _change_dir(self, in_dir): + if in_dir is not None and isdir(in_dir): + os.chdir(in_dir) + ############################################################################### # `CodeServer` class. @@ -68,6 +194,16 @@ class CodeServer(object): 'have an infinite loop in your code.' % SERVER_TIMEOUT self.timeout_msg = msg + def checker(self, info_parameter, in_dir=None): + """Calls the TestCode Class to test the current code""" + tc = TestCode(info_parameter, in_dir) + result = tc.run_code() + + # Put us back into the server pool queue since we are free now. + self.queue.put(self.port) + + return json.dumps(result) + def run_python_code(self, answer, test_parameter, in_dir=None): """Tests given Python function (`answer`) with the `test_code` supplied. If the optional `in_dir` keyword argument is supplied @@ -88,6 +224,7 @@ class CodeServer(object): old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(SERVER_TIMEOUT) + test_parameter = json.loads(test_parameter) success = False tb = None @@ -135,659 +272,741 @@ class CodeServer(object): result = {'success': success, 'error': err} return result - def run_bash_code(self, answer, test_parameter, in_dir=None): - """Tests given Bash code (`answer`) with the `test_code` supplied. - - The testcode should typically contain two lines, the first is a path to - the reference script we are to compare against. The second is a path - to the arguments to be supplied to the reference and submitted script. - The output of these will be compared for correctness. - - If the path's start with a "/" then we assume they are absolute paths. - If not, we assume they are relative paths w.r.t. the location of this - code_server script. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original - when done). - - Returns - ------- - - A tuple: (success, error message). - - """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) - - def _set_exec(fname): - os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR - | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP - | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) - submit_f = open('submit.sh', 'w') - submit_f.write(answer.lstrip()) - submit_f.close() - submit_path = abspath(submit_f.name) - _set_exec(submit_path) - - # ref path and path to arguments is a comma seperated string obtained - #from ref_code_path field in TestCase Mode - path_list = test_parameter.get('ref_code_path').split(',') - ref_path = path_list[0].strip() if path_list[0] else "" - test_case_path = path_list[1].strip() if path_list[1] else "" - - if not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - if not test_case_path.startswith('/'): - test_case_path = join(MY_DIR, test_case_path) - - # Add a new signal handler for the execution of this code. - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - - # Do whatever testing needed. - success = False - try: - success, err = self.check_bash_script(ref_path, submit_path, - test_case_path) - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - signal.signal(signal.SIGALRM, old_handler) - - # Delete the created file. - os.remove(submit_path) - - # Cancel the signal if any, see signal.alarm documentation. - signal.alarm(0) - - # Put us back into the server pool queue since we are free now. - self.queue.put(self.port) - - result = {'success': success, 'error': err} - return result - - def _run_command(self, cmd_args, *args, **kw): - """Run a command in a subprocess while blocking, the process is killed - if it takes more than 2 seconds to run. Return the Popen object, the - stdout and stderr. - """ - try: - proc = subprocess.Popen(cmd_args, *args, **kw) - stdout, stderr = proc.communicate() - except TimeoutException: - # Runaway code, so kill it. - proc.kill() - # Re-raise exception. - raise - return proc, stdout, stderr - - def check_bash_script(self, ref_script_path, submit_script_path, - test_case_path=None): - """ Function validates student script using instructor script as - reference. Test cases can optionally be provided. The first argument - ref_script_path, is the path to instructor script, it is assumed to - have executable permission. The second argument submit_script_path, is - the path to the student script, it is assumed to have executable - permission. The Third optional argument is the path to test the - scripts. Each line in this file is a test case and each test case is - passed to the script as standard arguments. - - Returns - -------- - - returns (True, "Correct answer") : If the student script passes all - test cases/have same output, when compared to the instructor script - - returns (False, error_msg): If the student script fails a single - test/have dissimilar output, when compared to the instructor script. - - Returns (False, error_msg): If mandatory arguments are not files or if - the required permissions are not given to the file(s). - - """ - if not ref_script_path: - return False, "No Test Script path found" - if not isfile(ref_script_path): - return False, "No file at %s" % ref_script_path - if not isfile(submit_script_path): - return False, 'No file at %s' % submit_script_path - if not os.access(ref_script_path, os.X_OK): - return False, 'Script %s is not executable' % ref_script_path - if not os.access(submit_script_path, os.X_OK): - return False, 'Script %s is not executable' % submit_script_path - - if test_case_path is None or "": - ret = self._run_command(ref_script_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - ret = self._run_command(submit_script_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - if inst_stdout == stdnt_stdout: - return True, 'Correct answer' - else: - err = "Error: expected %s, got %s" % (inst_stderr, - stdnt_stderr) - return False, err - else: - if not isfile(test_case_path): - return False, "No test case at %s" % test_case_path - if not os.access(ref_script_path, os.R_OK): - return False, "Test script %s, not readable" % test_case_path - valid_answer = True # We initially make it one, so that we can - # stop once a test case fails - loop_count = 0 # Loop count has to be greater than or - # equal to one. - # Useful for caching things like empty - # test files,etc. - test_cases = open(test_case_path).readlines() - num_lines = len(test_cases) - for test_case in test_cases: - loop_count += 1 - if valid_answer: - args = [ref_script_path] + [x for x in test_case.split()] - ret = self._run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - args = [submit_script_path]+[x for x in test_case.split()] - ret = self._run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - valid_answer = inst_stdout == stdnt_stdout - if valid_answer and (num_lines == loop_count): - return True, "Correct answer" - else: - err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr, - stdnt_stdout+stdnt_stderr) - return False, err - - def run_c_code(self, answer, test_parameter, in_dir=None): - """Tests given C code (`answer`) with the `test_code` supplied. - - The testcode is a path to the reference code. - The reference code will call the function submitted by the student. - The reference code will check for the expected output. - - If the path's start with a "/" then we assume they are absolute paths. - If not, we assume they are relative paths w.r.t. the location of this - code_server script. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original - when done). - - Returns - ------- - - A tuple: (success, error message). - - """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) - - # File extension must be .c - submit_f = open('submit.c', 'w') - submit_f.write(answer.lstrip()) - submit_f.close() - submit_path = abspath(submit_f.name) - - ref_path = test_parameter.get('ref_code_path').strip() - if not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - - # Add a new signal handler for the execution of this code. - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - - # Do whatever testing needed. - success = False - try: - success, err = self._check_c_cpp_code(ref_path, submit_path) - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - signal.signal(signal.SIGALRM, old_handler) - - # Delete the created file. - os.remove(submit_path) - - # Cancel the signal if any, see signal.alarm documentation. - signal.alarm(0) - - # Put us back into the server pool queue since we are free now. - self.queue.put(self.port) - - result = {'success': success, 'error': err} - return result - - def _compile_command(self, cmd, *args, **kw): - """Compiles C/C++/java code and returns errors if any. - Run a command in a subprocess while blocking, the process is killed - if it takes more than 2 seconds to run. Return the Popen object, the - stderr. - """ - try: - proc_compile = subprocess.Popen(cmd, shell=True, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = proc_compile.communicate() - except TimeoutException: - # Runaway code, so kill it. - proc_compile.kill() - # Re-raise exception. - raise - return proc_compile, err - - def _check_c_cpp_code(self, ref_code_path, submit_code_path): - """ Function validates student code using instructor code as - reference.The first argument ref_code_path, is the path to - instructor code, it is assumed to have executable permission. - The second argument submit_code_path, is the path to the student - code, it is assumed to have executable permission. - - Returns - -------- - - returns (True, "Correct answer") : If the student function returns - expected output when called by reference code. - - returns (False, error_msg): If the student function fails to return - expected output when called by reference code. - - Returns (False, error_msg): If mandatory arguments are not files or - if the required permissions are not given to the file(s). - - """ - if not isfile(ref_code_path): - return False, "No file at %s" % ref_code_path - if not isfile(submit_code_path): - return False, 'No file at %s' % submit_code_path - - success = False - output_path = os.getcwd() + '/output' - compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path) - ret = self._compile_command(compile_command) - proc, stdnt_stderr = ret - - # Only if compilation is successful, the program is executed - # And tested with testcases - if stdnt_stderr == '': - executable = os.getcwd() + '/executable' - compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path, - executable) - ret = self._compile_command(compile_main) - proc, main_err = ret - if main_err == '': - args = [executable] - ret = self._run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - if proc.returncode == 0: - success, err = True, "Correct answer" - else: - err = stdout + "\n" + stderr - os.remove(executable) - else: - err = "Error:" - try: - error_lines = main_err.splitlines() - for e in error_lines: - err = err + "\n" + e.split(":", 1)[1] - except: - err = err + "\n" + main_err - os.remove(output_path) - else: - err = "Compilation Error:" - try: - error_lines = stdnt_stderr.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + stdnt_stderr - return success, err - - def run_cplus_code(self, answer, test_parameter, in_dir=None): - """Tests given C++ code (`answer`) with the `test_code` supplied. - - The testcode is a path to the reference code. - The reference code will call the function submitted by the student. - The reference code will check for the expected output. - - If the path's start with a "/" then we assume they are absolute paths. - If not, we assume they are relative paths w.r.t. the location of this - code_server script. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original - when done). - - Returns - ------- - - A tuple: (success, error message). - - """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) - - # The file extension must be .cpp - submit_f = open('submitstd.cpp', 'w') - submit_f.write(answer.lstrip()) - submit_f.close() - submit_path = abspath(submit_f.name) - - ref_path = test_parameter.get('ref_code_path').strip() - if not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - - # Add a new signal handler for the execution of this code. - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - - # Do whatever testing needed. - success = False - try: - success, err = self._check_c_cpp_code(ref_path, submit_path) - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - signal.signal(signal.SIGALRM, old_handler) - - # Delete the created file. - os.remove(submit_path) - - # Cancel the signal if any, see signal.alarm documentation. - signal.alarm(0) - - # Put us back into the server pool queue since we are free now. - self.queue.put(self.port) - - result = {'success': success, 'error': err} - return result - - def run_java_code(self, answer, test_parameter, in_dir=None): - """Tests given java code (`answer`) with the `test_code` supplied. - - The testcode is a path to the reference code. - The reference code will call the function submitted by the student. - The reference code will check for the expected output. - - If the path's start with a "/" then we assume they are absolute paths. - If not, we assume they are relative paths w.r.t. the location of this - code_server script. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original - when done). - - Returns - ------- - - A tuple: (success, error message). - - """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) - - # The file extension must be .java - # The class name and file name must be same in java - submit_f = open('Test.java', 'w') - submit_f.write(answer.lstrip()) - submit_f.close() - submit_path = abspath(submit_f.name) - - ref_path = test_parameter.get('ref_code_path').strip() - if not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - - # Add a new signal handler for the execution of this code. - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - - # Do whatever testing needed. - success = False - try: - success, err = self._check_java_code(ref_path, submit_path) - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - signal.signal(signal.SIGALRM, old_handler) - - # Delete the created file. - os.remove(submit_path) - - # Cancel the signal if any, see signal.alarm documentation. - signal.alarm(0) - - # Put us back into the server pool queue since we are free now. - self.queue.put(self.port) - - return success, err - - def _check_java_code(self, ref_code_path, submit_code_path): - """ Function validates student code using instructor code as - reference.The first argument ref_code_path, is the path to - instructor code, it is assumed to have executable permission. - The second argument submit_code_path, is the path to the student - code, it is assumed to have executable permission. - - Returns - -------- - - returns (True, "Correct answer") : If the student function returns - expected output when called by reference code. - - returns (False, error_msg): If the student function fails to return - expected output when called by reference code. - - Returns (False, error_msg): If mandatory arguments are not files or - if the required permissions are not given to the file(s). - - """ - if not isfile(ref_code_path): - return False, "No file at %s" % ref_code_path - if not isfile(submit_code_path): - return False, 'No file at %s' % submit_code_path - - success = False - compile_command = "javac %s" % (submit_code_path) - ret = self._compile_command(compile_command) - proc, stdnt_stderr = ret - stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr) - - # Only if compilation is successful, the program is executed - # And tested with testcases - if stdnt_stderr == '': - student_directory = os.getcwd() + '/' - student_file_name = "Test" - compile_main = "javac %s -classpath %s -d %s" % (ref_code_path, - student_directory, - student_directory) - ret = self._compile_command(compile_main) - proc, main_err = ret - main_err = self._remove_null_substitute_char(main_err) - - if main_err == '': - main_file_name = (ref_code_path.split('/')[-1]).split('.')[0] - run_command = "java -cp %s %s" % (student_directory, - main_file_name) - ret = self._run_command(run_command, - stdin=None, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - if proc.returncode == 0: - success, err = True, "Correct answer" - else: - err = stdout + "\n" + stderr - success = False - os.remove("%s%s.class" % (student_directory, main_file_name)) - else: - err = "Error:\n" - try: - error_lines = main_err.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + main_err - os.remove("%s%s.class" % (student_directory, student_file_name)) - else: - err = "Compilation Error:\n" - try: - error_lines = stdnt_stderr.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + stdnt_stderr - result = {'success': success, 'error': err} - return result - - def _remove_null_substitute_char(self, string): - """Returns a string without any null and substitute characters""" - stripped = "" - for c in string: - if ord(c) is not 26 and ord(c) is not 0: - stripped = stripped + c - return ''.join(stripped) +# ############################################################################### +# # `CodeServer` class. +# ############################################################################### +# class CodeServer(object): +# """A code server that executes user submitted test code, tests it and +# reports if the code was correct or not. +# """ +# def __init__(self, port, queue): +# self.port = port +# self.queue = queue +# msg = 'Code took more than %s seconds to run. You probably '\ +# 'have an infinite loop in your code.' % SERVER_TIMEOUT +# self.timeout_msg = msg + +# def run_python_code(self, answer, test_parameter, in_dir=None): +# """Tests given Python function (`answer`) with the `test_code` +# supplied. If the optional `in_dir` keyword argument is supplied +# it changes the directory to that directory (it does not change +# it back to the original when done). This function also timesout +# when the function takes more than SERVER_TIMEOUT seconds to run +# to prevent runaway code. +# Returns +# ------- + +# A tuple: (success, error message). + +# """ +# if in_dir is not None and isdir(in_dir): +# os.chdir(in_dir) + +# # Add a new signal handler for the execution of this code. +# old_handler = signal.signal(signal.SIGALRM, timeout_handler) +# signal.alarm(SERVER_TIMEOUT) + +# test_parameter = json.loads(test_parameter) +# success = False +# tb = None + +# test_code = "" +# for test_case in test_parameter: +# pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \ +# else "" +# kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \ +# if test_case.get('kw_args') else "" +# args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args +# tcode = "assert {0}({1}) == {2}" \ +# .format(test_case.get('func_name'), args, test_case.get('expected_answer')) +# test_code += tcode + "\n" +# try: +# submitted = compile(answer, '<string>', mode='exec') +# g = {} +# exec submitted in g +# _tests = compile(test_code, '<string>', mode='exec') +# exec _tests in g +# except TimeoutException: +# err = self.timeout_msg +# except AssertionError: +# type, value, tb = sys.exc_info() +# info = traceback.extract_tb(tb) +# fname, lineno, func, text = info[-1] +# text = str(test_code).splitlines()[lineno-1] +# err = "{0} {1} in: {2}".format(type.__name__, str(value), text) +# except: +# type, value = sys.exc_info()[:2] +# err = "Error: {0}".format(repr(value)) +# else: +# success = True +# err = 'Correct answer' +# finally: +# del tb +# # Set back any original signal handler. +# signal.signal(signal.SIGALRM, old_handler) + +# # Cancel the signal if any, see signal.alarm documentation. +# signal.alarm(0) + +# # Put us back into the server pool queue since we are free now. +# self.queue.put(self.port) + +# result = {'success': success, 'error': err} +# return result + +# def run_bash_code(self, answer, test_parameter, in_dir=None): +# """Tests given Bash code (`answer`) with the `test_code` supplied. + +# The testcode should typically contain two lines, the first is a path to +# the reference script we are to compare against. The second is a path +# to the arguments to be supplied to the reference and submitted script. +# The output of these will be compared for correctness. + +# If the path's start with a "/" then we assume they are absolute paths. +# If not, we assume they are relative paths w.r.t. the location of this +# code_server script. + +# If the optional `in_dir` keyword argument is supplied it changes the +# directory to that directory (it does not change it back to the original +# when done). + +# Returns +# ------- + +# A tuple: (success, error message). + +# """ +# if in_dir is not None and isdir(in_dir): +# os.chdir(in_dir) + +# def _set_exec(fname): +# os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR +# | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP +# | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) +# submit_f = open('submit.sh', 'w') +# submit_f.write(answer.lstrip()) +# submit_f.close() +# submit_path = abspath(submit_f.name) +# _set_exec(submit_path) + +# # ref path and path to arguments is a comma seperated string obtained +# #from ref_code_path field in TestCase Mode +# path_list = test_parameter.get('ref_code_path').split(',') +# ref_path = path_list[0].strip() if path_list[0] else "" +# test_case_path = path_list[1].strip() if path_list[1] else "" + +# if not ref_path.startswith('/'): +# ref_path = join(MY_DIR, ref_path) +# if not test_case_path.startswith('/'): +# test_case_path = join(MY_DIR, test_case_path) + +# # Add a new signal handler for the execution of this code. +# old_handler = signal.signal(signal.SIGALRM, timeout_handler) +# signal.alarm(SERVER_TIMEOUT) + +# # Do whatever testing needed. +# success = False +# try: +# success, err = self.check_bash_script(ref_path, submit_path, +# test_case_path) +# except TimeoutException: +# err = self.timeout_msg +# except: +# type, value = sys.exc_info()[:2] +# err = "Error: {0}".format(repr(value)) +# finally: +# # Set back any original signal handler. +# signal.signal(signal.SIGALRM, old_handler) + +# # Delete the created file. +# os.remove(submit_path) + +# # Cancel the signal if any, see signal.alarm documentation. +# signal.alarm(0) + +# # Put us back into the server pool queue since we are free now. +# self.queue.put(self.port) + +# result = {'success': success, 'error': err} +# return result + +# def _run_command(self, cmd_args, *args, **kw): +# """Run a command in a subprocess while blocking, the process is killed +# if it takes more than 2 seconds to run. Return the Popen object, the +# stdout and stderr. +# """ +# try: +# proc = subprocess.Popen(cmd_args, *args, **kw) +# stdout, stderr = proc.communicate() +# except TimeoutException: +# # Runaway code, so kill it. +# proc.kill() +# # Re-raise exception. +# raise +# return proc, stdout, stderr + +# def check_bash_script(self, ref_script_path, submit_script_path, +# test_case_path=None): +# """ Function validates student script using instructor script as +# reference. Test cases can optionally be provided. The first argument +# ref_script_path, is the path to instructor script, it is assumed to +# have executable permission. The second argument submit_script_path, is +# the path to the student script, it is assumed to have executable +# permission. The Third optional argument is the path to test the +# scripts. Each line in this file is a test case and each test case is +# passed to the script as standard arguments. + +# Returns +# -------- + +# returns (True, "Correct answer") : If the student script passes all +# test cases/have same output, when compared to the instructor script + +# returns (False, error_msg): If the student script fails a single +# test/have dissimilar output, when compared to the instructor script. + +# Returns (False, error_msg): If mandatory arguments are not files or if +# the required permissions are not given to the file(s). + +# """ +# if not ref_script_path: +# return False, "No Test Script path found" +# if not isfile(ref_script_path): +# return False, "No file at %s" % ref_script_path +# if not isfile(submit_script_path): +# return False, 'No file at %s' % submit_script_path +# if not os.access(ref_script_path, os.X_OK): +# return False, 'Script %s is not executable' % ref_script_path +# if not os.access(submit_script_path, os.X_OK): +# return False, 'Script %s is not executable' % submit_script_path + +# if test_case_path is None or "": +# ret = self._run_command(ref_script_path, stdin=None, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# proc, inst_stdout, inst_stderr = ret +# ret = self._run_command(submit_script_path, stdin=None, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# proc, stdnt_stdout, stdnt_stderr = ret +# if inst_stdout == stdnt_stdout: +# return True, 'Correct answer' +# else: +# err = "Error: expected %s, got %s" % (inst_stderr, +# stdnt_stderr) +# return False, err +# else: +# if not isfile(test_case_path): +# return False, "No test case at %s" % test_case_path +# if not os.access(ref_script_path, os.R_OK): +# return False, "Test script %s, not readable" % test_case_path +# valid_answer = True # We initially make it one, so that we can +# # stop once a test case fails +# loop_count = 0 # Loop count has to be greater than or +# # equal to one. +# # Useful for caching things like empty +# # test files,etc. +# test_cases = open(test_case_path).readlines() +# num_lines = len(test_cases) +# for test_case in test_cases: +# loop_count += 1 +# if valid_answer: +# args = [ref_script_path] + [x for x in test_case.split()] +# ret = self._run_command(args, stdin=None, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# proc, inst_stdout, inst_stderr = ret +# args = [submit_script_path]+[x for x in test_case.split()] +# ret = self._run_command(args, stdin=None, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# proc, stdnt_stdout, stdnt_stderr = ret +# valid_answer = inst_stdout == stdnt_stdout +# if valid_answer and (num_lines == loop_count): +# return True, "Correct answer" +# else: +# err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr, +# stdnt_stdout+stdnt_stderr) +# return False, err + +# def run_c_code(self, answer, test_parameter, in_dir=None): +# """Tests given C code (`answer`) with the `test_code` supplied. + +# The testcode is a path to the reference code. +# The reference code will call the function submitted by the student. +# The reference code will check for the expected output. + +# If the path's start with a "/" then we assume they are absolute paths. +# If not, we assume they are relative paths w.r.t. the location of this +# code_server script. + +# If the optional `in_dir` keyword argument is supplied it changes the +# directory to that directory (it does not change it back to the original +# when done). + +# Returns +# ------- + +# A tuple: (success, error message). + +# """ +# if in_dir is not None and isdir(in_dir): +# os.chdir(in_dir) + +# # File extension must be .c +# submit_f = open('submit.c', 'w') +# submit_f.write(answer.lstrip()) +# submit_f.close() +# submit_path = abspath(submit_f.name) + +# ref_path = test_parameter.get('ref_code_path').strip() +# if not ref_path.startswith('/'): +# ref_path = join(MY_DIR, ref_path) + +# # Add a new signal handler for the execution of this code. +# old_handler = signal.signal(signal.SIGALRM, timeout_handler) +# signal.alarm(SERVER_TIMEOUT) + +# # Do whatever testing needed. +# success = False +# try: +# success, err = self._check_c_cpp_code(ref_path, submit_path) +# except TimeoutException: +# err = self.timeout_msg +# except: +# type, value = sys.exc_info()[:2] +# err = "Error: {0}".format(repr(value)) +# finally: +# # Set back any original signal handler. +# signal.signal(signal.SIGALRM, old_handler) + +# # Delete the created file. +# os.remove(submit_path) + +# # Cancel the signal if any, see signal.alarm documentation. +# signal.alarm(0) + +# # Put us back into the server pool queue since we are free now. +# self.queue.put(self.port) + +# result = {'success': success, 'error': err} +# return result + +# def _compile_command(self, cmd, *args, **kw): +# """Compiles C/C++/java code and returns errors if any. +# Run a command in a subprocess while blocking, the process is killed +# if it takes more than 2 seconds to run. Return the Popen object, the +# stderr. +# """ +# try: +# proc_compile = subprocess.Popen(cmd, shell=True, stdin=None, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# out, err = proc_compile.communicate() +# except TimeoutException: +# # Runaway code, so kill it. +# proc_compile.kill() +# # Re-raise exception. +# raise +# return proc_compile, err + +# def _check_c_cpp_code(self, ref_code_path, submit_code_path): +# """ Function validates student code using instructor code as +# reference.The first argument ref_code_path, is the path to +# instructor code, it is assumed to have executable permission. +# The second argument submit_code_path, is the path to the student +# code, it is assumed to have executable permission. + +# Returns +# -------- + +# returns (True, "Correct answer") : If the student function returns +# expected output when called by reference code. + +# returns (False, error_msg): If the student function fails to return +# expected output when called by reference code. + +# Returns (False, error_msg): If mandatory arguments are not files or +# if the required permissions are not given to the file(s). + +# """ +# if not isfile(ref_code_path): +# return False, "No file at %s" % ref_code_path +# if not isfile(submit_code_path): +# return False, 'No file at %s' % submit_code_path + +# success = False +# output_path = os.getcwd() + '/output' +# compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path) +# ret = self._compile_command(compile_command) +# proc, stdnt_stderr = ret + +# # Only if compilation is successful, the program is executed +# # And tested with testcases +# if stdnt_stderr == '': +# executable = os.getcwd() + '/executable' +# compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path, +# executable) +# ret = self._compile_command(compile_main) +# proc, main_err = ret +# if main_err == '': +# args = [executable] +# ret = self._run_command(args, stdin=None, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# proc, stdout, stderr = ret +# if proc.returncode == 0: +# success, err = True, "Correct answer" +# else: +# err = stdout + "\n" + stderr +# os.remove(executable) +# else: +# err = "Error:" +# try: +# error_lines = main_err.splitlines() +# for e in error_lines: +# err = err + "\n" + e.split(":", 1)[1] +# except: +# err = err + "\n" + main_err +# os.remove(output_path) +# else: +# err = "Compilation Error:" +# try: +# error_lines = stdnt_stderr.splitlines() +# for e in error_lines: +# if ':' in e: +# err = err + "\n" + e.split(":", 1)[1] +# else: +# err = err + "\n" + e +# except: +# err = err + "\n" + stdnt_stderr +# return success, err + +# def run_cplus_code(self, answer, test_parameter, in_dir=None): +# """Tests given C++ code (`answer`) with the `test_code` supplied. + +# The testcode is a path to the reference code. +# The reference code will call the function submitted by the student. +# The reference code will check for the expected output. + +# If the path's start with a "/" then we assume they are absolute paths. +# If not, we assume they are relative paths w.r.t. the location of this +# code_server script. + +# If the optional `in_dir` keyword argument is supplied it changes the +# directory to that directory (it does not change it back to the original +# when done). + +# Returns +# ------- + +# A tuple: (success, error message). + +# """ +# if in_dir is not None and isdir(in_dir): +# os.chdir(in_dir) + +# # The file extension must be .cpp +# submit_f = open('submitstd.cpp', 'w') +# submit_f.write(answer.lstrip()) +# submit_f.close() +# submit_path = abspath(submit_f.name) + +# ref_path = test_parameter.get('ref_code_path').strip() +# if not ref_path.startswith('/'): +# ref_path = join(MY_DIR, ref_path) + +# # Add a new signal handler for the execution of this code. +# old_handler = signal.signal(signal.SIGALRM, timeout_handler) +# signal.alarm(SERVER_TIMEOUT) + +# # Do whatever testing needed. +# success = False +# try: +# success, err = self._check_c_cpp_code(ref_path, submit_path) +# except TimeoutException: +# err = self.timeout_msg +# except: +# type, value = sys.exc_info()[:2] +# err = "Error: {0}".format(repr(value)) +# finally: +# # Set back any original signal handler. +# signal.signal(signal.SIGALRM, old_handler) + +# # Delete the created file. +# os.remove(submit_path) + +# # Cancel the signal if any, see signal.alarm documentation. +# signal.alarm(0) + +# # Put us back into the server pool queue since we are free now. +# self.queue.put(self.port) + +# result = {'success': success, 'error': err} +# return result + +# def run_java_code(self, answer, test_parameter, in_dir=None): +# """Tests given java code (`answer`) with the `test_code` supplied. + +# The testcode is a path to the reference code. +# The reference code will call the function submitted by the student. +# The reference code will check for the expected output. + +# If the path's start with a "/" then we assume they are absolute paths. +# If not, we assume they are relative paths w.r.t. the location of this +# code_server script. + +# If the optional `in_dir` keyword argument is supplied it changes the +# directory to that directory (it does not change it back to the original +# when done). + +# Returns +# ------- + +# A tuple: (success, error message). + +# """ +# if in_dir is not None and isdir(in_dir): +# os.chdir(in_dir) + +# # The file extension must be .java +# # The class name and file name must be same in java +# submit_f = open('Test.java', 'w') +# submit_f.write(answer.lstrip()) +# submit_f.close() +# submit_path = abspath(submit_f.name) + +# ref_path = test_parameter.get('ref_code_path').strip() +# if not ref_path.startswith('/'): +# ref_path = join(MY_DIR, ref_path) + +# # Add a new signal handler for the execution of this code. +# old_handler = signal.signal(signal.SIGALRM, timeout_handler) +# signal.alarm(SERVER_TIMEOUT) + +# # Do whatever testing needed. +# success = False +# try: +# success, err = self._check_java_code(ref_path, submit_path) +# except TimeoutException: +# err = self.timeout_msg +# except: +# type, value = sys.exc_info()[:2] +# err = "Error: {0}".format(repr(value)) +# finally: +# # Set back any original signal handler. +# signal.signal(signal.SIGALRM, old_handler) + +# # Delete the created file. +# os.remove(submit_path) + +# # Cancel the signal if any, see signal.alarm documentation. +# signal.alarm(0) + +# # Put us back into the server pool queue since we are free now. +# self.queue.put(self.port) + +# return success, err + +# def _check_java_code(self, ref_code_path, submit_code_path): +# """ Function validates student code using instructor code as +# reference.The first argument ref_code_path, is the path to +# instructor code, it is assumed to have executable permission. +# The second argument submit_code_path, is the path to the student +# code, it is assumed to have executable permission. + +# Returns +# -------- + +# returns (True, "Correct answer") : If the student function returns +# expected output when called by reference code. + +# returns (False, error_msg): If the student function fails to return +# expected output when called by reference code. + +# Returns (False, error_msg): If mandatory arguments are not files or +# if the required permissions are not given to the file(s). + +# """ +# if not isfile(ref_code_path): +# return False, "No file at %s" % ref_code_path +# if not isfile(submit_code_path): +# return False, 'No file at %s' % submit_code_path + +# success = False +# compile_command = "javac %s" % (submit_code_path) +# ret = self._compile_command(compile_command) +# proc, stdnt_stderr = ret +# stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr) + +# # Only if compilation is successful, the program is executed +# # And tested with testcases +# if stdnt_stderr == '': +# student_directory = os.getcwd() + '/' +# student_file_name = "Test" +# compile_main = "javac %s -classpath %s -d %s" % (ref_code_path, +# student_directory, +# student_directory) +# ret = self._compile_command(compile_main) +# proc, main_err = ret +# main_err = self._remove_null_substitute_char(main_err) + +# if main_err == '': +# main_file_name = (ref_code_path.split('/')[-1]).split('.')[0] +# run_command = "java -cp %s %s" % (student_directory, +# main_file_name) +# ret = self._run_command(run_command, +# stdin=None, +# shell=True, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# proc, stdout, stderr = ret +# if proc.returncode == 0: +# success, err = True, "Correct answer" +# else: +# err = stdout + "\n" + stderr +# success = False +# os.remove("%s%s.class" % (student_directory, main_file_name)) +# else: +# err = "Error:\n" +# try: +# error_lines = main_err.splitlines() +# for e in error_lines: +# if ':' in e: +# err = err + "\n" + e.split(":", 1)[1] +# else: +# err = err + "\n" + e +# except: +# err = err + "\n" + main_err +# os.remove("%s%s.class" % (student_directory, student_file_name)) +# else: +# err = "Compilation Error:\n" +# try: +# error_lines = stdnt_stderr.splitlines() +# for e in error_lines: +# if ':' in e: +# err = err + "\n" + e.split(":", 1)[1] +# else: +# err = err + "\n" + e +# except: +# err = err + "\n" + stdnt_stderr +# result = {'success': success, 'error': err} +# return result + +# def _remove_null_substitute_char(self, string): +# """Returns a string without any null and substitute characters""" +# stripped = "" +# for c in string: +# if ord(c) is not 26 and ord(c) is not 0: +# stripped = stripped + c +# return ''.join(stripped) - def run_scilab_code(self, answer, test_parameter, in_dir=None): - """Tests given Scilab function (`answer`) with the `test_code` - supplied. If the optional `in_dir` keyword argument is supplied - it changes the directory to that directory (it does not change - it back to the original when done). This function also timesout - when the function takes more than SERVER_TIMEOUT seconds to run - to prevent runaway code. - - The testcode is a path to the reference code. - The reference code will call the function submitted by the student. - The reference code will check for the expected output. - - If the path's start with a "/" then we assume they are absolute paths. - If not, we assume they are relative paths w.r.t. the location of this - code_server script. - - Returns - ------- - - A tuple: (success, error message). - - """ - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) +# def run_scilab_code(self, answer, test_parameter, in_dir=None): +# """Tests given Scilab function (`answer`) with the `test_code` +# supplied. If the optional `in_dir` keyword argument is supplied +# it changes the directory to that directory (it does not change +# it back to the original when done). This function also timesout +# when the function takes more than SERVER_TIMEOUT seconds to run +# to prevent runaway code. + +# The testcode is a path to the reference code. +# The reference code will call the function submitted by the student. +# The reference code will check for the expected output. + +# If the path's start with a "/" then we assume they are absolute paths. +# If not, we assume they are relative paths w.r.t. the location of this +# code_server script. + +# Returns +# ------- + +# A tuple: (success, error message). + +# """ +# if in_dir is not None and isdir(in_dir): +# os.chdir(in_dir) - # Removes all the commands that terminates scilab - answer,i = self._remove_scilab_exit(answer.lstrip()) - - # Throw message if there are commmands that terminates scilab - add_err="" - if i > 0: - add_err = "Please do not use exit, quit and abort commands in your\ - code.\n Otherwise your code will not be evaluated\ - correctly.\n" - - # The file extension should be .sci - submit_f = open('function.sci','w') - submit_f.write(answer) - submit_f.close() - submit_path = abspath(submit_f.name) - - ref_path = test_parameter.get('ref_code_path').strip() - if not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - - # Add a new signal handler for the execution of this code. - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - - # Do whatever testing needed. - success = False - try: - cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) - cmd += ' | timeout 8 scilab-cli -nb' - ret = self._run_command(cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - - # Get only the error. - stderr = self._get_error(stdout) - if stderr is None: - # Clean output - stdout = self._strip_output(stdout) - if proc.returncode == 5: - success, err = True, "Correct answer" - else: - err = add_err + stdout - else: - err = add_err + stderr - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - signal.signal(signal.SIGALRM, old_handler) - - # Delete the created file. - os.remove(submit_path) - - # Cancel the signal if any, see signal.alarm documentation. - signal.alarm(0) - - # Put us back into the server pool queue since we are free now. - self.queue.put(self.port) - - result = {'success': success, 'error': err} - return result - - def _remove_scilab_exit(self, string): - """ - Removes exit, quit and abort from the scilab code - """ - new_string = "" - i=0 - for line in string.splitlines(): - new_line = re.sub(r"exit.*$","",line) - new_line = re.sub(r"quit.*$","",new_line) - new_line = re.sub(r"abort.*$","",new_line) - if line != new_line: - i=i+1 - new_string = new_string +'\n'+ new_line - return new_string, i +# # Removes all the commands that terminates scilab +# answer,i = self._remove_scilab_exit(answer.lstrip()) + +# # Throw message if there are commmands that terminates scilab +# add_err="" +# if i > 0: +# add_err = "Please do not use exit, quit and abort commands in your\ +# code.\n Otherwise your code will not be evaluated\ +# correctly.\n" + +# # The file extension should be .sci +# submit_f = open('function.sci','w') +# submit_f.write(answer) +# submit_f.close() +# submit_path = abspath(submit_f.name) + +# ref_path = test_parameter.get('ref_code_path').strip() +# if not ref_path.startswith('/'): +# ref_path = join(MY_DIR, ref_path) + +# # Add a new signal handler for the execution of this code. +# old_handler = signal.signal(signal.SIGALRM, timeout_handler) +# signal.alarm(SERVER_TIMEOUT) + +# # Do whatever testing needed. +# success = False +# try: +# cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) +# cmd += ' | timeout 8 scilab-cli -nb' +# ret = self._run_command(cmd, +# shell=True, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE) +# proc, stdout, stderr = ret + +# # Get only the error. +# stderr = self._get_error(stdout) +# if stderr is None: +# # Clean output +# stdout = self._strip_output(stdout) +# if proc.returncode == 5: +# success, err = True, "Correct answer" +# else: +# err = add_err + stdout +# else: +# err = add_err + stderr +# except TimeoutException: +# err = self.timeout_msg +# except: +# type, value = sys.exc_info()[:2] +# err = "Error: {0}".format(repr(value)) +# finally: +# # Set back any original signal handler. +# signal.signal(signal.SIGALRM, old_handler) + +# # Delete the created file. +# os.remove(submit_path) + +# # Cancel the signal if any, see signal.alarm documentation. +# signal.alarm(0) + +# # Put us back into the server pool queue since we are free now. +# self.queue.put(self.port) + +# result = {'success': success, 'error': err} +# return result + +# def _remove_scilab_exit(self, string): +# """ +# Removes exit, quit and abort from the scilab code +# """ +# new_string = "" +# i=0 +# for line in string.splitlines(): +# new_line = re.sub(r"exit.*$","",line) +# new_line = re.sub(r"quit.*$","",new_line) +# new_line = re.sub(r"abort.*$","",new_line) +# if line != new_line: +# i=i+1 +# new_string = new_string +'\n'+ new_line +# return new_string, i def _get_error(self, string): """ |