From d8847656ba79e51c96c6e3650374aaf616c375dc Mon Sep 17 00:00:00 2001 From: ankitjavalkar Date: Thu, 30 Apr 2015 11:21:49 +0530 Subject: Code Review: Code Refactoring --- testapp/exam/bash_code_evaluator.py | 140 +++++++++++++++++++ testapp/exam/c_cpp_code_evaluator.py | 168 ++++++++++++++++++++++ testapp/exam/code_evaluator.py | 255 ++++++++++++++++++++++++++++++++++ testapp/exam/code_server.py | 38 ++--- testapp/exam/evaluate_bash_code.py | 119 ---------------- testapp/exam/evaluate_c_code.py | 135 ------------------ testapp/exam/evaluate_code.py | 191 ------------------------- testapp/exam/evaluate_cpp_code.py | 48 ------- testapp/exam/evaluate_java_code.py | 50 ------- testapp/exam/evaluate_python_code.py | 60 -------- testapp/exam/evaluate_scilab_code.py | 86 ------------ testapp/exam/java_code_evaluator.py | 166 ++++++++++++++++++++++ testapp/exam/language_registry.py | 30 ++-- testapp/exam/python_code_evaluator.py | 85 ++++++++++++ testapp/exam/scilab_code_evaluator.py | 131 +++++++++++++++++ testapp/exam/settings.py | 8 ++ testapp/test_server.py | 78 ++++++----- 17 files changed, 1036 insertions(+), 752 deletions(-) create mode 100644 testapp/exam/bash_code_evaluator.py create mode 100644 testapp/exam/c_cpp_code_evaluator.py create mode 100644 testapp/exam/code_evaluator.py delete mode 100644 testapp/exam/evaluate_bash_code.py delete mode 100644 testapp/exam/evaluate_c_code.py delete mode 100644 testapp/exam/evaluate_code.py delete mode 100644 testapp/exam/evaluate_cpp_code.py delete mode 100644 testapp/exam/evaluate_java_code.py delete mode 100644 testapp/exam/evaluate_python_code.py delete mode 100644 testapp/exam/evaluate_scilab_code.py create mode 100644 testapp/exam/java_code_evaluator.py create mode 100644 testapp/exam/python_code_evaluator.py create mode 100644 testapp/exam/scilab_code_evaluator.py (limited to 'testapp') diff --git a/testapp/exam/bash_code_evaluator.py b/testapp/exam/bash_code_evaluator.py new file mode 100644 index 0000000..60f0bb3 --- /dev/null +++ b/testapp/exam/bash_code_evaluator.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +import traceback +import pwd +import os +from os.path import join, isfile +import subprocess +import importlib + +# local imports +from code_evaluator import CodeEvaluator +# from language_registry import registry + + +class BashCodeEvaluator(CodeEvaluator): + """Tests the Bash code obtained from Code Server""" + def __init__(self, test_case_data, language, user_answer, + ref_code_path=None, in_dir=None): + super(BashCodeEvaluator, self).__init__(test_case_data, language, user_answer, + ref_code_path, in_dir) + self.submit_path = self.create_submit_code_file('submit.sh') + self.test_case_args = self.setup_code_evaluator() + + def setup_code_evaluator(self): + super(BashCodeEvaluator, self).setup_code_evaluator() + + self.set_file_as_executable(self.submit_path) + get_ref_path, get_test_case_path = self.ref_code_path.strip().split(',') + get_ref_path = get_ref_path.strip() + get_test_case_path = get_test_case_path.strip() + ref_path, test_case_path = self.set_test_code_file_path(get_ref_path, + get_test_case_path) + + return ref_path, self.submit_path, test_case_path + + + # # Public Protocol ########## + # def evaluate_code(self): + # submit_path = self.create_submit_code_file('submit.sh') + # self.set_file_as_executable(submit_path) + # get_ref_path, get_test_case_path = self.ref_code_path.strip().split(',') + # get_ref_path = get_ref_path.strip() + # get_test_case_path = get_test_case_path.strip() + # ref_path, test_case_path = self.set_test_code_file_path(get_ref_path, + # get_test_case_path) + + # success, err = self._check_bash_script(ref_path, submit_path, + # test_case_path) + + # # Delete the created file. + # os.remove(submit_path) + + # return success, err + + + # Private Protocol ########## + def check_code(self, ref_path, submit_path, + test_case_path=None): + """ Function validates student script using instructor script as + reference. Test cases can optionally be provided. The first argument + ref_path, is the path to instructor script, it is assumed to + have executable permission. The second argument submit_path, is + the path to the student script, it is assumed to have executable + permission. The Third optional argument is the path to test the + scripts. Each line in this file is a test case and each test case is + passed to the script as standard arguments. + + Returns + -------- + + returns (True, "Correct answer") : If the student script passes all + test cases/have same output, when compared to the instructor script + + returns (False, error_msg): If the student script fails a single + test/have dissimilar output, when compared to the instructor script. + + Returns (False, error_msg): If mandatory arguments are not files or if + the required permissions are not given to the file(s). + + """ + if not isfile(ref_path): + return False, "No file at %s or Incorrect path" % ref_path + if not isfile(submit_path): + return False, "No file at %s or Incorrect path" % submit_path + if not os.access(ref_path, os.X_OK): + return False, "Script %s is not executable" % ref_path + if not os.access(submit_path, os.X_OK): + return False, "Script %s is not executable" % submit_path + + success = False + + if test_case_path is None or "": + ret = self.run_command(ref_path, stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, inst_stdout, inst_stderr = ret + ret = self.run_command(submit_path, stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, stdnt_stdout, stdnt_stderr = ret + if inst_stdout == stdnt_stdout: + return True, "Correct answer" + else: + err = "Error: expected %s, got %s" % (inst_stderr, + stdnt_stderr) + return False, err + else: + if not isfile(test_case_path): + return False, "No test case at %s" % test_case_path + if not os.access(ref_path, os.R_OK): + return False, "Test script %s, not readable" % test_case_path + # valid_answer is True, so that we can stop once a test case fails + valid_answer = True + # loop_count has to be greater than or equal to one. + # Useful for caching things like empty test files,etc. + loop_count = 0 + test_cases = open(test_case_path).readlines() + num_lines = len(test_cases) + for test_case in test_cases: + loop_count += 1 + if valid_answer: + args = [ref_path] + [x for x in test_case.split()] + ret = self.run_command(args, stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, inst_stdout, inst_stderr = ret + args = [submit_path]+[x for x in test_case.split()] + ret = self.run_command(args, stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, stdnt_stdout, stdnt_stderr = ret + valid_answer = inst_stdout == stdnt_stdout + if valid_answer and (num_lines == loop_count): + return True, "Correct answer" + else: + err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr, + stdnt_stdout+stdnt_stderr) + return False, err + + +# registry.register('bash', EvaluateBashCode) diff --git a/testapp/exam/c_cpp_code_evaluator.py b/testapp/exam/c_cpp_code_evaluator.py new file mode 100644 index 0000000..d611f96 --- /dev/null +++ b/testapp/exam/c_cpp_code_evaluator.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python +import traceback +import pwd +import os +from os.path import join, isfile +import subprocess +import importlib + +# local imports +from code_evaluator import CodeEvaluator +# from language_registry import registry + + +class CCppCodeEvaluator(CodeEvaluator): + """Tests the C code obtained from Code Server""" + def __init__(self, test_case_data, language, user_answer, + ref_code_path=None, in_dir=None): + super(CCppCodeEvaluator, self).__init__(test_case_data, language, user_answer, + ref_code_path, in_dir) + self.submit_path = self.create_submit_code_file('submit.c') + self.test_case_args = self.setup_code_evaluator() + + # Private Protocol ########## + def setup_code_evaluator(self): + super(CCppCodeEvaluator, self).setup_code_evaluator() + + get_ref_path = self.ref_code_path + ref_path, test_case_path = self.set_test_code_file_path(get_ref_path) + + # Set file paths + c_user_output_path = os.getcwd() + '/output' + c_ref_output_path = os.getcwd() + '/executable' + + # Set command variables + compile_command = 'g++ {0} -c -o {1}'.format(self.submit_path, + c_user_output_path) + compile_main = 'g++ {0} {1} -o {2}'.format(ref_path, + c_user_output_path, + c_ref_output_path) + run_command_args = [c_ref_output_path] + remove_user_output = c_user_output_path + remove_ref_output = c_ref_output_path + + return ref_path, self.submit_path, compile_command, compile_main, run_command_args, remove_user_output, remove_ref_output + + def teardown_code_evaluator(self): + # Delete the created file. + super(CCppCodeEvaluator, self).teardown_code_evaluator() + os.remove(self.submit_path) + + # # Public Protocol ########## + # def evaluate_code(self): + # submit_path = self.create_submit_code_file('submit.c') + # get_ref_path = self.ref_code_path + # ref_path, test_case_path = self.set_test_code_file_path(get_ref_path) + # success = False + + # # Set file paths + # c_user_output_path = os.getcwd() + '/output' + # c_ref_output_path = os.getcwd() + '/executable' + + # # Set command variables + # compile_command = 'g++ {0} -c -o {1}'.format(submit_path, + # c_user_output_path) + # compile_main = 'g++ {0} {1} -o {2}'.format(ref_path, + # c_user_output_path, + # c_ref_output_path) + # run_command_args = [c_ref_output_path] + # remove_user_output = c_user_output_path + # remove_ref_output = c_ref_output_path + + # success, err = self.check_code(ref_path, submit_path, compile_command, + # compile_main, run_command_args, + # remove_user_output, remove_ref_output) + + # # Delete the created file. + # os.remove(submit_path) + + # return success, err + + def check_code(self, ref_code_path, submit_code_path, compile_command, + compile_main, run_command_args, remove_user_output, + remove_ref_output): + """ Function validates student code using instructor code as + reference.The first argument ref_code_path, is the path to + instructor code, it is assumed to have executable permission. + The second argument submit_code_path, is the path to the student + code, it is assumed to have executable permission. + + Returns + -------- + + returns (True, "Correct answer") : If the student function returns + expected output when called by reference code. + + returns (False, error_msg): If the student function fails to return + expected output when called by reference code. + + Returns (False, error_msg): If mandatory arguments are not files or + if the required permissions are not given to the file(s). + + """ + if not isfile(ref_code_path): + return False, "No file at %s or Incorrect path" % ref_code_path + if not isfile(submit_code_path): + return False, 'No file at %s or Incorrect path' % submit_code_path + + success = False + # output_path = os.getcwd() + '/output' + ret = self.compile_command(compile_command) + proc, stdnt_stderr = ret + # if self.language == "java": + stdnt_stderr = self.remove_null_substitute_char(stdnt_stderr) + + # Only if compilation is successful, the program is executed + # And tested with testcases + if stdnt_stderr == '': + ret = self.compile_command(compile_main) + proc, main_err = ret + # if self.language == "java": + main_err = self.remove_null_substitute_char(main_err) + + if main_err == '': + ret = self.run_command(run_command_args, stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, stdout, stderr = ret + if proc.returncode == 0: + success, err = True, "Correct answer" + else: + err = stdout + "\n" + stderr + os.remove(remove_ref_output) + else: + err = "Error:" + try: + error_lines = main_err.splitlines() + for e in error_lines: + if ':' in e: + err = err + "\n" + e.split(":", 1)[1] + else: + err = err + "\n" + e + except: + err = err + "\n" + main_err + os.remove(remove_user_output) + else: + err = "Compilation Error:" + try: + error_lines = stdnt_stderr.splitlines() + for e in error_lines: + if ':' in e: + err = err + "\n" + e.split(":", 1)[1] + else: + err = err + "\n" + e + except: + err = err + "\n" + stdnt_stderr + + return success, err + + def remove_null_substitute_char(self, string): + """Returns a string without any null and substitute characters""" + stripped = "" + for c in string: + if ord(c) is not 26 and ord(c) is not 0: + stripped = stripped + c + return ''.join(stripped) + + +# registry.register('c', EvaluateCCode) diff --git a/testapp/exam/code_evaluator.py b/testapp/exam/code_evaluator.py new file mode 100644 index 0000000..3cc7374 --- /dev/null +++ b/testapp/exam/code_evaluator.py @@ -0,0 +1,255 @@ +import sys +from SimpleXMLRPCServer import SimpleXMLRPCServer +import pwd +import os +import stat +from os.path import isdir, dirname, abspath, join, isfile +import signal +from multiprocessing import Process, Queue +import subprocess +import re +import json +# Local imports. +from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT + + +MY_DIR = abspath(dirname(__file__)) + + +# Raised when the code times-out. +# c.f. http://pguides.net/python/timeout-a-function +class TimeoutException(Exception): + pass + + +# Private Protocol ########## +def timeout_handler(signum, frame): + """A handler for the ALARM signal.""" + raise TimeoutException('Code took too long to run.') + + +def create_signal_handler(): + """Add a new signal handler for the execution of this code.""" + prev_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(SERVER_TIMEOUT) + return prev_handler + + +def set_original_signal_handler(old_handler=None): + """Set back any original signal handler.""" + if old_handler is not None: + signal.signal(signal.SIGALRM, old_handler) + return + else: + raise Exception("Signal Handler: object cannot be NoneType") + + +def delete_signal_handler(): + signal.alarm(0) + return + + +############################################################################### +# `TestCode` class. +############################################################################### +class CodeEvaluator(object): + """Tests the code obtained from Code Server""" + def __init__(self, test_case_data, language, user_answer, + ref_code_path=None, in_dir=None): + msg = 'Code took more than %s seconds to run. You probably '\ + 'have an infinite loop in your code.' % SERVER_TIMEOUT + self.timeout_msg = msg + self.test_case_data = test_case_data + self.language = language.lower() + self.user_answer = user_answer + self.ref_code_path = ref_code_path + self.in_dir = in_dir + self.test_case_args = None + + # Public Protocol ########## + @classmethod + def from_json(cls, language, json_data, in_dir): + json_data = json.loads(json_data) + test_case_data = json_data.get("test_case_data") + user_answer = json_data.get("user_answer") + ref_code_path = json_data.get("ref_code_path") + + instance = cls(Test_case_data, language, user_answer, ref_code_path, + in_dir) + return instance + + # def run_code(self): + # """Tests given code (`answer`) with the test cases based on + # given arguments. + + # The ref_code_path is a path to the reference code. + # The reference code will call the function submitted by the student. + # The reference code will check for the expected output. + + # If the path's start with a "/" then we assume they are absolute paths. + # If not, we assume they are relative paths w.r.t. the location of this + # code_server script. + + # If the optional `in_dir` keyword argument is supplied it changes the + # directory to that directory (it does not change it back to the original + # when done). + + # Returns + # ------- + + # A tuple: (success, error message). + # """ + # self._change_dir(self.in_dir) + + # # Add a new signal handler for the execution of this code. + # prev_handler = self.create_signal_handler() + # success = False + + # # Do whatever testing needed. + # try: + # success, err = self.evaluate_code() #pass *list where list is a list of args obtained from setup + + # except TimeoutException: + # err = self.timeout_msg + # except: + # type, value = sys.exc_info()[:2] + # err = "Error: {0}".format(repr(value)) + # finally: + # # Set back any original signal handler. + # self.set_original_signal_handler(prev_handler) + + # # Cancel the signal + # self.delete_signal_handler() + + # result = {'success': success, 'error': err} + # return result + + def code_evaluator(self): + """Tests given code (`answer`) with the test cases based on + given arguments. + + The ref_code_path is a path to the reference code. + The reference code will call the function submitted by the student. + The reference code will check for the expected output. + + If the path's start with a "/" then we assume they are absolute paths. + If not, we assume they are relative paths w.r.t. the location of this + code_server script. + + If the optional `in_dir` keyword argument is supplied it changes the + directory to that directory (it does not change it back to the original + when done). + + Returns + ------- + + A tuple: (success, error message). + """ + + self.setup_code_evaluator() + success, err = self.evaluate_code(self.test_case_args) + self.teardown_code_evaluator() + + result = {'success': success, 'error': err} + return result + + # Public Protocol ########## + def setup_code_evaluator(self): + self._change_dir(self.in_dir) + + def evaluate_code(self, args): + # Add a new signal handler for the execution of this code. + prev_handler = create_signal_handler() + success = False + args = args or [] + + # Do whatever testing needed. + try: + success, err = self.check_code(*args) + + except TimeoutException: + err = self.timeout_msg + except: + _type, value = sys.exc_info()[:2] + err = "Error: {0}".format(repr(value)) + finally: + # Set back any original signal handler. + set_original_signal_handler(prev_handler) + + return success, err + + def teardown_code_evaluator(self): + # Cancel the signal + delete_signal_handler() + + def check_code(self): + raise NotImplementedError("check_code method not implemented") + + # Private Protocol ########## + def create_submit_code_file(self, file_name): + """ Write the code (`answer`) to a file and set the file path""" + submit_f = open(file_name, 'w') + submit_f.write(self.user_answer.lstrip()) + submit_f.close() + submit_path = abspath(submit_f.name) + + return submit_path + + def set_file_as_executable(self, fname): + os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR + | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP + | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) + + def set_test_code_file_path(self, ref_path=None, test_case_path=None): + if ref_path and not ref_path.startswith('/'): + ref_path = join(MY_DIR, ref_path) + + if test_case_path and not test_case_path.startswith('/'): + test_case_path = join(MY_DIR, test_case_path) + + return ref_path, test_case_path + + def run_command(self, cmd_args, *args, **kw): + """Run a command in a subprocess while blocking, the process is killed + if it takes more than 2 seconds to run. Return the Popen object, the + stdout and stderr. + """ + try: + proc = subprocess.Popen(cmd_args, *args, **kw) + stdout, stderr = proc.communicate() + except TimeoutException: + # Runaway code, so kill it. + proc.kill() + # Re-raise exception. + raise + return proc, stdout, stderr + + def compile_command(self, cmd, *args, **kw): + """Compiles C/C++/java code and returns errors if any. + Run a command in a subprocess while blocking, the process is killed + if it takes more than 2 seconds to run. Return the Popen object, the + stderr. + """ + try: + proc_compile = subprocess.Popen(cmd, shell=True, stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = proc_compile.communicate() + except TimeoutException: + # Runaway code, so kill it. + proc_compile.kill() + # Re-raise exception. + raise + return proc_compile, err + + def _change_dir(self, in_dir): + if in_dir is not None and isdir(in_dir): + os.chdir(in_dir) + + def remove_null_substitute_char(self, string): + """Returns a string without any null and substitute characters""" + stripped = "" + for c in string: + if ord(c) is not 26 and ord(c) is not 0: + stripped = stripped + c + return ''.join(stripped) diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py index 697b131..c621dcd 100755 --- a/testapp/exam/code_server.py +++ b/testapp/exam/code_server.py @@ -32,13 +32,13 @@ import json import importlib # Local imports. from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT -from language_registry import registry -from evaluate_python_code import EvaluatePythonCode -from evaluate_c_code import EvaluateCCode -from evaluate_cpp_code import EvaluateCppCode -from evaluate_java_code import EvaluateJavaCode -from evaluate_scilab_code import EvaluateScilabCode -from evaluate_bash_code import EvaluateBashCode +from language_registry import set_registry +# from evaluate_python_code import EvaluatePythonCode +# from evaluate_c_code import EvaluateCCode +# from evaluate_cpp_code import EvaluateCppCode +# from evaluate_java_code import EvaluateJavaCode +# from evaluate_scilab_code import EvaluateScilabCode +# from evaluate_bash_code import EvaluateBashCode MY_DIR = abspath(dirname(__file__)) @@ -69,24 +69,15 @@ class CodeServer(object): """Calls relevant EvaluateCode class based on language to check the answer code """ - evaluate_code_instance = self.create_class_instance(language, - json_data, in_dir) - - result = evaluate_code_instance.run_code() + code_evaluator = self._create_evaluator_instance(language, json_data, + in_dir) + result = code_evaluator.code_evaluator() # Put us back into the server pool queue since we are free now. self.queue.put(self.port) return json.dumps(result) - # Public Protocol ########## - def create_class_instance(self, language, json_data, in_dir): - """Create instance of relevant EvaluateCode class based on language""" - cls = registry.get_class(language) - instance = cls.from_json(language, json_data, in_dir) - return instance - - # Public Protocol ########## def run(self): """Run XMLRPC server, serving our methods.""" server = SimpleXMLRPCServer(("localhost", self.port)) @@ -95,6 +86,15 @@ class CodeServer(object): self.queue.put(self.port) server.serve_forever() + # Private Protocol ########## + def _create_evaluator_instance(self, language, json_data, in_dir): + """Create instance of relevant EvaluateCode class based on language""" + set_registry() + registry = get_registry() + cls = registry.get_class(language) + instance = cls.from_json(language, json_data, in_dir) + return instance + ############################################################################### # `ServerPool` class. diff --git a/testapp/exam/evaluate_bash_code.py b/testapp/exam/evaluate_bash_code.py deleted file mode 100644 index 2905d65..0000000 --- a/testapp/exam/evaluate_bash_code.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python -import traceback -import pwd -import os -from os.path import join, isfile -import subprocess -import importlib - -# local imports -from evaluate_code import EvaluateCode -from language_registry import registry - - -class EvaluateBashCode(EvaluateCode): - """Tests the Bash code obtained from Code Server""" - # Public Protocol ########## - def evaluate_code(self): - submit_path = self.create_submit_code_file('submit.sh') - self.set_file_as_executable(submit_path) - get_ref_path, get_test_case_path = self.ref_code_path.strip().split(',') - get_ref_path = get_ref_path.strip() - get_test_case_path = get_test_case_path.strip() - ref_path, test_case_path = self.set_test_code_file_path(get_ref_path, - get_test_case_path) - - success, err = self._check_bash_script(ref_path, submit_path, - test_case_path) - - # Delete the created file. - os.remove(submit_path) - - return success, err - - # Private Protocol ########## - def _check_bash_script(self, ref_path, submit_path, - test_case_path=None): - """ Function validates student script using instructor script as - reference. Test cases can optionally be provided. The first argument - ref_path, is the path to instructor script, it is assumed to - have executable permission. The second argument submit_path, is - the path to the student script, it is assumed to have executable - permission. The Third optional argument is the path to test the - scripts. Each line in this file is a test case and each test case is - passed to the script as standard arguments. - - Returns - -------- - - returns (True, "Correct answer") : If the student script passes all - test cases/have same output, when compared to the instructor script - - returns (False, error_msg): If the student script fails a single - test/have dissimilar output, when compared to the instructor script. - - Returns (False, error_msg): If mandatory arguments are not files or if - the required permissions are not given to the file(s). - - """ - if not isfile(ref_path): - return False, "No file at %s or Incorrect path" % ref_path - if not isfile(submit_path): - return False, "No file at %s or Incorrect path" % submit_path - if not os.access(ref_path, os.X_OK): - return False, "Script %s is not executable" % ref_path - if not os.access(submit_path, os.X_OK): - return False, "Script %s is not executable" % submit_path - - success = False - - if test_case_path is None or "": - ret = self.run_command(ref_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - ret = self.run_command(submit_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - if inst_stdout == stdnt_stdout: - return True, "Correct answer" - else: - err = "Error: expected %s, got %s" % (inst_stderr, - stdnt_stderr) - return False, err - else: - if not isfile(test_case_path): - return False, "No test case at %s" % test_case_path - if not os.access(ref_path, os.R_OK): - return False, "Test script %s, not readable" % test_case_path - # valid_answer is True, so that we can stop once a test case fails - valid_answer = True - # loop_count has to be greater than or equal to one. - # Useful for caching things like empty test files,etc. - loop_count = 0 - test_cases = open(test_case_path).readlines() - num_lines = len(test_cases) - for test_case in test_cases: - loop_count += 1 - if valid_answer: - args = [ref_path] + [x for x in test_case.split()] - ret = self.run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - args = [submit_path]+[x for x in test_case.split()] - ret = self.run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - valid_answer = inst_stdout == stdnt_stdout - if valid_answer and (num_lines == loop_count): - return True, "Correct answer" - else: - err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr, - stdnt_stdout+stdnt_stderr) - return False, err - - -registry.register('bash', EvaluateBashCode) diff --git a/testapp/exam/evaluate_c_code.py b/testapp/exam/evaluate_c_code.py deleted file mode 100644 index 0b9e352..0000000 --- a/testapp/exam/evaluate_c_code.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -import traceback -import pwd -import os -from os.path import join, isfile -import subprocess -import importlib - -# local imports -from evaluate_code import EvaluateCode -from language_registry import registry - - -class EvaluateCCode(EvaluateCode): - """Tests the C code obtained from Code Server""" - # Public Protocol ########## - def evaluate_code(self): - submit_path = self.create_submit_code_file('submit.c') - get_ref_path = self.ref_code_path - ref_path, test_case_path = self.set_test_code_file_path(get_ref_path) - success = False - - # Set file paths - c_user_output_path = os.getcwd() + '/output' - c_ref_output_path = os.getcwd() + '/executable' - - # Set command variables - compile_command = 'g++ {0} -c -o {1}'.format(submit_path, - c_user_output_path) - compile_main = 'g++ {0} {1} -o {2}'.format(ref_path, - c_user_output_path, - c_ref_output_path) - run_command_args = [c_ref_output_path] - remove_user_output = c_user_output_path - remove_ref_output = c_ref_output_path - - success, err = self.check_code(ref_path, submit_path, compile_command, - compile_main, run_command_args, - remove_user_output, remove_ref_output) - - # Delete the created file. - os.remove(submit_path) - - return success, err - - # Public Protocol ########## - def check_code(self, ref_code_path, submit_code_path, compile_command, - compile_main, run_command_args, remove_user_output, - remove_ref_output): - """ Function validates student code using instructor code as - reference.The first argument ref_code_path, is the path to - instructor code, it is assumed to have executable permission. - The second argument submit_code_path, is the path to the student - code, it is assumed to have executable permission. - - Returns - -------- - - returns (True, "Correct answer") : If the student function returns - expected output when called by reference code. - - returns (False, error_msg): If the student function fails to return - expected output when called by reference code. - - Returns (False, error_msg): If mandatory arguments are not files or - if the required permissions are not given to the file(s). - - """ - if not isfile(ref_code_path): - return False, "No file at %s or Incorrect path" % ref_code_path - if not isfile(submit_code_path): - return False, 'No file at %s or Incorrect path' % submit_code_path - - success = False - # output_path = os.getcwd() + '/output' - ret = self.compile_command(compile_command) - proc, stdnt_stderr = ret - # if self.language == "java": - stdnt_stderr = self.remove_null_substitute_char(stdnt_stderr) - - # Only if compilation is successful, the program is executed - # And tested with testcases - if stdnt_stderr == '': - ret = self.compile_command(compile_main) - proc, main_err = ret - # if self.language == "java": - main_err = self.remove_null_substitute_char(main_err) - - if main_err == '': - ret = self.run_command(run_command_args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - if proc.returncode == 0: - success, err = True, "Correct answer" - else: - err = stdout + "\n" + stderr - os.remove(remove_ref_output) - else: - err = "Error:" - try: - error_lines = main_err.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + main_err - os.remove(remove_user_output) - else: - err = "Compilation Error:" - try: - error_lines = stdnt_stderr.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + stdnt_stderr - - return success, err - - # Public Protocol ########## - def remove_null_substitute_char(self, string): - """Returns a string without any null and substitute characters""" - stripped = "" - for c in string: - if ord(c) is not 26 and ord(c) is not 0: - stripped = stripped + c - return ''.join(stripped) - - -registry.register('c', EvaluateCCode) diff --git a/testapp/exam/evaluate_code.py b/testapp/exam/evaluate_code.py deleted file mode 100644 index b9892ed..0000000 --- a/testapp/exam/evaluate_code.py +++ /dev/null @@ -1,191 +0,0 @@ -import sys -from SimpleXMLRPCServer import SimpleXMLRPCServer -import pwd -import os -import stat -from os.path import isdir, dirname, abspath, join, isfile -import signal -from multiprocessing import Process, Queue -import subprocess -import re -import json -import importlib -# Local imports. -from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT - - -MY_DIR = abspath(dirname(__file__)) - - -# Raised when the code times-out. -# c.f. http://pguides.net/python/timeout-a-function -class TimeoutException(Exception): - pass - - -# Private Protocol ########## -def timeout_handler(signum, frame): - """A handler for the ALARM signal.""" - raise TimeoutException('Code took too long to run.') - - -def create_signal_handler(): - """Add a new signal handler for the execution of this code.""" - prev_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - return prev_handler - - -def set_original_signal_handler(old_handler=None): - """Set back any original signal handler.""" - if old_handler is not None: - signal.signal(signal.SIGALRM, old_handler) - return - else: - raise Exception("Signal Handler: object cannot be NoneType") - - -def delete_signal_handler(): - signal.alarm(0) - return - - -############################################################################### -# `TestCode` class. -############################################################################### -class EvaluateCode(object): - """Tests the code obtained from Code Server""" - def __init__(self, test_case_data, language, user_answer, - ref_code_path=None, in_dir=None): - msg = 'Code took more than %s seconds to run. You probably '\ - 'have an infinite loop in your code.' % SERVER_TIMEOUT - self.timeout_msg = msg - self.test_case_data = test_case_data - self.language = language.lower() - self.user_answer = user_answer - self.ref_code_path = ref_code_path - self.in_dir = in_dir - - # Public Protocol ########## - - @classmethod - def from_json(cls, language, json_data, in_dir): - json_data = json.loads(json_data) - test_case_data = json_data.get("test_case_data") - user_answer = json_data.get("user_answer") - ref_code_path = json_data.get("ref_code_path") - - instance = cls(Test_case_data, language, user_answer, ref_code_path, - in_dir) - return instance - - def run_code(self): - """Tests given code (`answer`) with the test cases based on - given arguments. - - The ref_code_path is a path to the reference code. - The reference code will call the function submitted by the student. - The reference code will check for the expected output. - - If the path's start with a "/" then we assume they are absolute paths. - If not, we assume they are relative paths w.r.t. the location of this - code_server script. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original - when done). - - Returns - ------- - - A tuple: (success, error message). - """ - self._change_dir(self.in_dir) - - # Add a new signal handler for the execution of this code. - prev_handler = create_signal_handler() - success = False - - # Do whatever testing needed. - try: - success, err = self.evaluate_code() - - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - set_original_signal_handler(prev_handler) - - # Cancel the signal - delete_signal_handler() - - result = {'success': success, 'error': err} - return result - - def evaluate_code(self): - raise NotImplementedError("evaluate_code method not implemented") - - def create_submit_code_file(self, file_name): - """ Write the code (`answer`) to a file and set the file path""" - submit_f = open(file_name, 'w') - submit_f.write(self.user_answer.lstrip()) - submit_f.close() - submit_path = abspath(submit_f.name) - - return submit_path - - def set_file_as_executable(self, fname): - os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR - | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP - | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) - - def set_test_code_file_path(self, ref_path=None, test_case_path=None): - if ref_path and not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - - if test_case_path and not test_case_path.startswith('/'): - test_case_path = join(MY_DIR, test_case_path) - - return ref_path, test_case_path - - def run_command(self, cmd_args, *args, **kw): - """Run a command in a subprocess while blocking, the process is killed - if it takes more than 2 seconds to run. Return the Popen object, the - stdout and stderr. - """ - try: - proc = subprocess.Popen(cmd_args, *args, **kw) - stdout, stderr = proc.communicate() - except TimeoutException: - # Runaway code, so kill it. - proc.kill() - # Re-raise exception. - raise - return proc, stdout, stderr - - def compile_command(self, cmd, *args, **kw): - """Compiles C/C++/java code and returns errors if any. - Run a command in a subprocess while blocking, the process is killed - if it takes more than 2 seconds to run. Return the Popen object, the - stderr. - """ - try: - proc_compile = subprocess.Popen(cmd, shell=True, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = proc_compile.communicate() - except TimeoutException: - # Runaway code, so kill it. - proc_compile.kill() - # Re-raise exception. - raise - return proc_compile, err - - # Private Protocol ########## - - def _change_dir(self, in_dir): - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) diff --git a/testapp/exam/evaluate_cpp_code.py b/testapp/exam/evaluate_cpp_code.py deleted file mode 100644 index 987d041..0000000 --- a/testapp/exam/evaluate_cpp_code.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python -import traceback -import pwd -import os -from os.path import join, isfile -import subprocess -import importlib - -# local imports -from evaluate_c_code import EvaluateCCode -from evaluate_code import EvaluateCode -from language_registry import registry - - -class EvaluateCppCode(EvaluateCCode, EvaluateCode): - """Tests the C code obtained from Code Server""" - # Public Protocol ########## - def evaluate_code(self): - submit_path = self.create_submit_code_file('submitstd.cpp') - get_ref_path = self.ref_code_path - ref_path, test_case_path = self.set_test_code_file_path(get_ref_path) - success = False - - # Set file paths - c_user_output_path = os.getcwd() + '/output' - c_ref_output_path = os.getcwd() + '/executable' - - # Set command variables - compile_command = 'g++ {0} -c -o {1}'.format(submit_path, - c_user_output_path) - compile_main = 'g++ {0} {1} -o {2}'.format(ref_path, - c_user_output_path, - c_ref_output_path) - run_command_args = c_ref_output_path - remove_user_output = c_user_output_path - remove_ref_output = c_ref_output_path - - success, err = self.check_code(ref_path, submit_path, compile_command, - compile_main, run_command_args, - remove_user_output, remove_ref_output) - - # Delete the created file. - os.remove(submit_path) - - return success, err - - -registry.register('cpp', EvaluateCppCode) diff --git a/testapp/exam/evaluate_java_code.py b/testapp/exam/evaluate_java_code.py deleted file mode 100644 index d04be4e..0000000 --- a/testapp/exam/evaluate_java_code.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python -import traceback -import pwd -import os -from os.path import join, isfile -import subprocess -import importlib - -# local imports -from evaluate_c_code import EvaluateCCode -from evaluate_code import EvaluateCode -from language_registry import registry - - -class EvaluateJavaCode(EvaluateCCode, EvaluateCode): - """Tests the C code obtained from Code Server""" - # Public Protocol ########## - def evaluate_code(self): - submit_path = self.create_submit_code_file('Test.java') - ref_path, test_case_path = self.set_test_code_file_path(self.ref_code_path) - success = False - - # Set file paths - java_student_directory = os.getcwd() + '/' - java_ref_file_name = (ref_path.split('/')[-1]).split('.')[0] - - # Set command variables - compile_command = 'javac {0}'.format(submit_path), - compile_main = ('javac {0} -classpath ' - '{1} -d {2}').format(ref_path, - java_student_directory, - java_student_directory) - run_command_args = "java -cp {0} {1}".format(java_student_directory, - java_ref_file_name) - remove_user_output = "{0}{1}.class".format(java_student_directory, - 'Test') - remove_ref_output = "{0}{1}.class".format(java_student_directory, - java_ref_file_name) - - success, err = self.check_code(ref_path, submit_path, compile_command, - compile_main, run_command_args, - remove_user_output, remove_ref_output) - - # Delete the created file. - os.remove(submit_path) - - return success, err - - -registry.register('java', EvaluateJavaCode) diff --git a/testapp/exam/evaluate_python_code.py b/testapp/exam/evaluate_python_code.py deleted file mode 100644 index 8892ae6..0000000 --- a/testapp/exam/evaluate_python_code.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python -import sys -import traceback -import os -from os.path import join -import importlib - -# local imports -from evaluate_code import EvaluateCode -from language_registry import registry - - -class EvaluatePythonCode(EvaluateCode): - """Tests the Python code obtained from Code Server""" - # Public Protocol ########## - def evaluate_code(self): - success = False - - try: - tb = None - test_code = self._create_test_case() - submitted = compile(self.user_answer, '', mode='exec') - g = {} - exec submitted in g - _tests = compile(test_code, '', mode='exec') - exec _tests in g - except AssertionError: - type, value, tb = sys.exc_info() - info = traceback.extract_tb(tb) - fname, lineno, func, text = info[-1] - text = str(test_code).splitlines()[lineno-1] - err = "{0} {1} in: {2}".format(type.__name__, str(value), text) - else: - success = True - err = 'Correct answer' - - del tb - return success, err - - # Private Protocol ########## - def _create_test_case(self): - """ - Create assert based test cases in python - """ - test_code = "" - for test_case in self.test_case_data: - pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) \ - if test_case.get('pos_args') else "" - kw_args = ", ".join(str(k+"="+a) for k, a - in test_case.get('kw_args').iteritems()) \ - if test_case.get('kw_args') else "" - args = pos_args + ", " + kw_args if pos_args and kw_args \ - else pos_args or kw_args - tcode = "assert {0}({1}) == {2}".format(test_case.get('func_name'), - args, test_case.get('expected_answer')) - test_code += tcode + "\n" - return test_code - - -registry.register('python', EvaluatePythonCode) diff --git a/testapp/exam/evaluate_scilab_code.py b/testapp/exam/evaluate_scilab_code.py deleted file mode 100644 index 899abc9..0000000 --- a/testapp/exam/evaluate_scilab_code.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python -import traceback -import os -from os.path import join, isfile -import subprocess -import re -import importlib - -# local imports -from evaluate_code import EvaluateCode -from language_registry import registry - - -class EvaluateScilabCode(EvaluateCode): - """Tests the Scilab code obtained from Code Server""" - # Public Protocol ########## - def evaluate_code(self): - submit_path = self.create_submit_code_file('function.sci') - ref_path, test_case_path = self.set_test_code_file_path() - success = False - - cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) - cmd += ' | timeout 8 scilab-cli -nb' - ret = self.run_command(cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - - # Get only the error. - stderr = self._get_error(stdout) - if stderr is None: - # Clean output - stdout = self._strip_output(stdout) - if proc.returncode == 5: - success, err = True, "Correct answer" - else: - err = add_err + stdout - else: - err = add_err + stderr - - # Delete the created file. - os.remove(submit_path) - - return success, err - - # Private Protocol ########## - def _remove_scilab_exit(self, string): - """ - Removes exit, quit and abort from the scilab code - """ - new_string = "" - i = 0 - for line in string.splitlines(): - new_line = re.sub(r"exit.*$", "", line) - new_line = re.sub(r"quit.*$", "", new_line) - new_line = re.sub(r"abort.*$", "", new_line) - if line != new_line: - i = i + 1 - new_string = new_string + '\n' + new_line - return new_string, i - - # Private Protocol ########## - def _get_error(self, string): - """ - Fetches only the error from the string. - Returns None if no error. - """ - obj = re.search("!.+\n.+", string) - if obj: - return obj.group() - return None - - # Private Protocol ########## - def _strip_output(self, out): - """ - Cleans whitespace from the output - """ - strip_out = "Message" - for l in out.split('\n'): - if l.strip(): - strip_out = strip_out+"\n"+l.strip() - return strip_out - - -registry.register('scilab', EvaluateScilabCode) diff --git a/testapp/exam/java_code_evaluator.py b/testapp/exam/java_code_evaluator.py new file mode 100644 index 0000000..4a80acd --- /dev/null +++ b/testapp/exam/java_code_evaluator.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python +import traceback +import pwd +import os +from os.path import join, isfile +import subprocess +import importlib + +# local imports +# from c_code_evaluator import CCodeEvaluator +from code_evaluator import CodeEvaluator +# from language_registry import registry + + +class JavaCodeEvaluator(CodeEvaluator): + """Tests the Java code obtained from Code Server""" + def __init__(self, test_case_data, language, user_answer, + ref_code_path=None, in_dir=None): + super(JavaCodeEvaluator, self).__init__(test_case_data, language, user_answer, + ref_code_path, in_dir) + self.submit_path = self.create_submit_code_file('Test.java') + self.test_case_args = self.setup_code_evaluator() + + # Private Protocol ########## + def setup_code_evaluator(self): + super(JavaCodeEvaluator, self).setup_code_evaluator() + + ref_path, test_case_path = self.set_test_code_file_path(self.ref_code_path) + + # Set file paths + java_student_directory = os.getcwd() + '/' + java_ref_file_name = (ref_path.split('/')[-1]).split('.')[0] + + # Set command variables + compile_command = 'javac {0}'.format(submit_path), + compile_main = ('javac {0} -classpath ' + '{1} -d {2}').format(ref_path, + java_student_directory, + java_student_directory) + run_command_args = "java -cp {0} {1}".format(java_student_directory, + java_ref_file_name) + remove_user_output = "{0}{1}.class".format(java_student_directory, + 'Test') + remove_ref_output = "{0}{1}.class".format(java_student_directory, + java_ref_file_name) + + return ref_path, submit_path, compile_command, compile_main, run_command_args, remove_user_output, remove_ref_output + + def teardown_code_evaluator(self): + # Delete the created file. + super(JavaCodeEvaluator, self).teardown_code_evaluator() + os.remove(self.submit_path) + + + # Public Protocol ########## + # def evaluate_code(self): + # submit_path = self.create_submit_code_file('Test.java') + # ref_path, test_case_path = self.set_test_code_file_path(self.ref_code_path) + # success = False + + # # Set file paths + # java_student_directory = os.getcwd() + '/' + # java_ref_file_name = (ref_path.split('/')[-1]).split('.')[0] + + # # Set command variables + # compile_command = 'javac {0}'.format(submit_path), + # compile_main = ('javac {0} -classpath ' + # '{1} -d {2}').format(ref_path, + # java_student_directory, + # java_student_directory) + # run_command_args = "java -cp {0} {1}".format(java_student_directory, + # java_ref_file_name) + # remove_user_output = "{0}{1}.class".format(java_student_directory, + # 'Test') + # remove_ref_output = "{0}{1}.class".format(java_student_directory, + # java_ref_file_name) + + # success, err = self.check_code(ref_path, submit_path, compile_command, + # compile_main, run_command_args, + # remove_user_output, remove_ref_output) + + # # Delete the created file. + # os.remove(submit_path) + + # return success, err + + def check_code(self, ref_code_path, submit_code_path, compile_command, + compile_main, run_command_args, remove_user_output, + remove_ref_output): + """ Function validates student code using instructor code as + reference.The first argument ref_code_path, is the path to + instructor code, it is assumed to have executable permission. + The second argument submit_code_path, is the path to the student + code, it is assumed to have executable permission. + + Returns + -------- + + returns (True, "Correct answer") : If the student function returns + expected output when called by reference code. + + returns (False, error_msg): If the student function fails to return + expected output when called by reference code. + + Returns (False, error_msg): If mandatory arguments are not files or + if the required permissions are not given to the file(s). + + """ + if not isfile(ref_code_path): + return False, "No file at %s or Incorrect path" % ref_code_path + if not isfile(submit_code_path): + return False, 'No file at %s or Incorrect path' % submit_code_path + + success = False + # output_path = os.getcwd() + '/output' + ret = self.compile_command(compile_command) + proc, stdnt_stderr = ret + # if self.language == "java": + stdnt_stderr = self.remove_null_substitute_char(stdnt_stderr) + + # Only if compilation is successful, the program is executed + # And tested with testcases + if stdnt_stderr == '': + ret = self.compile_command(compile_main) + proc, main_err = ret + # if self.language == "java": + main_err = self.remove_null_substitute_char(main_err) + + if main_err == '': + ret = self.run_command(run_command_args, stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, stdout, stderr = ret + if proc.returncode == 0: + success, err = True, "Correct answer" + else: + err = stdout + "\n" + stderr + os.remove(remove_ref_output) + else: + err = "Error:" + try: + error_lines = main_err.splitlines() + for e in error_lines: + if ':' in e: + err = err + "\n" + e.split(":", 1)[1] + else: + err = err + "\n" + e + except: + err = err + "\n" + main_err + os.remove(remove_user_output) + else: + err = "Compilation Error:" + try: + error_lines = stdnt_stderr.splitlines() + for e in error_lines: + if ':' in e: + err = err + "\n" + e.split(":", 1)[1] + else: + err = err + "\n" + e + except: + err = err + "\n" + stdnt_stderr + + return success, err + + +# registry.register('java', EvaluateJavaCode) diff --git a/testapp/exam/language_registry.py b/testapp/exam/language_registry.py index 4d56de2..8700d32 100644 --- a/testapp/exam/language_registry.py +++ b/testapp/exam/language_registry.py @@ -1,17 +1,31 @@ -#!/usr/bin/env python +from settings import language_register +registry = None -class LanguageRegistry(object): +def set_registry(): + globals registry = _LanguageRegistry() + +def get_registry(): + return registry + +class _LanguageRegistry(object): def __init__(self): - self._registry = {} + for language, module in language_register.iteritems(): + self._register[language] = None # Public Protocol ########## def get_class(self, language): - return self._registry[language] + if not self._register[language]: + self._register[language] = language_register[language] - # Public Protocol ########## - def register(self, language, cls): - self._registry[language] = cls + cls = self._register[language] + module_name, class_name = cls.split(".") + # load the module, will raise ImportError if module cannot be loaded + get_module = importlib.import_module(module_name) + # get the class, will raise AttributeError if class cannot be found + get_class = getattr(get_module, class_name) + return get_class + # def register(self, language, cls): + # self._register[language] = cls -registry = LanguageRegistry() diff --git a/testapp/exam/python_code_evaluator.py b/testapp/exam/python_code_evaluator.py new file mode 100644 index 0000000..05a5063 --- /dev/null +++ b/testapp/exam/python_code_evaluator.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +import sys +import traceback +import os +from os.path import join +import importlib + +# local imports +from code_evaluator import CodeEvaluator +# from language_registry import registry + + +class PythonCodeEvaluator(CodeEvaluator): + """Tests the Python code obtained from Code Server""" + # Private Protocol ########## + def check_code(self): + success = False + + try: + tb = None + test_code = self._create_test_case() + submitted = compile(self.user_answer, '', mode='exec') + g = {} + exec submitted in g + _tests = compile(test_code, '', mode='exec') + exec _tests in g + except AssertionError: + type, value, tb = sys.exc_info() + info = traceback.extract_tb(tb) + fname, lineno, func, text = info[-1] + text = str(test_code).splitlines()[lineno-1] + err = "{0} {1} in: {2}".format(type.__name__, str(value), text) + else: + success = True + err = 'Correct answer' + + del tb + return success, err + + # # Public Protocol ########## + # def evaluate_code(self): + # success = False + + # try: + # tb = None + # test_code = self._create_test_case() + # submitted = compile(self.user_answer, '', mode='exec') + # g = {} + # exec submitted in g + # _tests = compile(test_code, '', mode='exec') + # exec _tests in g + # except AssertionError: + # type, value, tb = sys.exc_info() + # info = traceback.extract_tb(tb) + # fname, lineno, func, text = info[-1] + # text = str(test_code).splitlines()[lineno-1] + # err = "{0} {1} in: {2}".format(type.__name__, str(value), text) + # else: + # success = True + # err = 'Correct answer' + + # del tb + # return success, err + + # Private Protocol ########## + def _create_test_case(self): + """ + Create assert based test cases in python + """ + test_code = "" + for test_case in self.test_case_data: + pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) \ + if test_case.get('pos_args') else "" + kw_args = ", ".join(str(k+"="+a) for k, a + in test_case.get('kw_args').iteritems()) \ + if test_case.get('kw_args') else "" + args = pos_args + ", " + kw_args if pos_args and kw_args \ + else pos_args or kw_args + tcode = "assert {0}({1}) == {2}".format(test_case.get('func_name'), + args, test_case.get('expected_answer')) + test_code += tcode + "\n" + return test_code + + +# registry.register('python', EvaluatePythonCode) diff --git a/testapp/exam/scilab_code_evaluator.py b/testapp/exam/scilab_code_evaluator.py new file mode 100644 index 0000000..073fbcb --- /dev/null +++ b/testapp/exam/scilab_code_evaluator.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +import traceback +import os +from os.path import join, isfile +import subprocess +import re +import importlib + +# local imports +from code_evaluator import CodeEvaluator +# from language_registry import registry + + +class ScilabCodeEvaluator(CodeEvaluator): + """Tests the Scilab code obtained from Code Server""" + def __init__(self, test_case_data, language, user_answer, + ref_code_path=None, in_dir=None): + super(ScilabCodeEvaluator, self).__init__(test_case_data, language, user_answer, + ref_code_path, in_dir) + self.submit_path = self.create_submit_code_file('function.sci') + self.test_case_args = self.setup_code_evaluator() + + # Private Protocol ########## + def setup_code_evaluator(self): + super(ScilabCodeEvaluator, self).setup_code_evaluator() + + ref_path, test_case_path = self.set_test_code_file_path(self.ref_code_path) + + return ref_path, # Return as a tuple + + def teardown_code_evaluator(self): + # Delete the created file. + super(ScilabCodeEvaluator, self).teardown_code_evaluator() + os.remove(self.submit_path) + + def check_code(self, ref_path): + success = False + + cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) + cmd += ' | timeout 8 scilab-cli -nb' + ret = self.run_command(cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, stdout, stderr = ret + + # Get only the error. + stderr = self._get_error(stdout) + if stderr is None: + # Clean output + stdout = self._strip_output(stdout) + if proc.returncode == 5: + success, err = True, "Correct answer" + else: + err = add_err + stdout + else: + err = add_err + stderr + + return success, err + + # # Public Protocol ########## + # def evaluate_code(self): + # submit_path = self.create_submit_code_file('function.sci') + # ref_path, test_case_path = self.set_test_code_file_path() + # success = False + + # cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) + # cmd += ' | timeout 8 scilab-cli -nb' + # ret = self.run_command(cmd, + # shell=True, + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE) + # proc, stdout, stderr = ret + + # # Get only the error. + # stderr = self._get_error(stdout) + # if stderr is None: + # # Clean output + # stdout = self._strip_output(stdout) + # if proc.returncode == 5: + # success, err = True, "Correct answer" + # else: + # err = add_err + stdout + # else: + # err = add_err + stderr + + # # Delete the created file. + # os.remove(submit_path) + + # return success, err + + # Private Protocol ########## + def _remove_scilab_exit(self, string): + """ + Removes exit, quit and abort from the scilab code + """ + new_string = "" + i = 0 + for line in string.splitlines(): + new_line = re.sub(r"exit.*$", "", line) + new_line = re.sub(r"quit.*$", "", new_line) + new_line = re.sub(r"abort.*$", "", new_line) + if line != new_line: + i = i + 1 + new_string = new_string + '\n' + new_line + return new_string, i + + # Private Protocol ########## + def _get_error(self, string): + """ + Fetches only the error from the string. + Returns None if no error. + """ + obj = re.search("!.+\n.+", string) + if obj: + return obj.group() + return None + + # Private Protocol ########## + def _strip_output(self, out): + """ + Cleans whitespace from the output + """ + strip_out = "Message" + for l in out.split('\n'): + if l.strip(): + strip_out = strip_out+"\n"+l.strip() + return strip_out + + +# registry.register('scilab', EvaluateScilabCode) diff --git a/testapp/exam/settings.py b/testapp/exam/settings.py index 682516f..497a620 100644 --- a/testapp/exam/settings.py +++ b/testapp/exam/settings.py @@ -18,3 +18,11 @@ SERVER_TIMEOUT = 2 # reason set this to the root you have to serve at. In the above example # host.org/foo/exam set URL_ROOT='/foo' URL_ROOT = '' + +language_register = {"python": "python_code_evaluator", + "c": "c_cpp_code_evaluator", + "cpp": "c_cpp_code_evaluator", + "java": "java_evaluator", + "bash": "bash_evaluator", + "scilab": "scilab_evaluator", + } diff --git a/testapp/test_server.py b/testapp/test_server.py index 39995e1..9319d5b 100644 --- a/testapp/test_server.py +++ b/testapp/test_server.py @@ -1,12 +1,15 @@ import unittest import os -from exam import evaluate_c_code, evaluate_cpp_code, evaluate_java_code, evaluate_python_code, evaluate_scilab_code, evaluate_bash_code -from exam.language_registry import registry +# from exam import evaluate_c_code, evaluate_cpp_code, evaluate_java_code, evaluate_python_code, evaluate_scilab_code, evaluate_bash_code +from exam import c_cpp_code_evaluator, bash_code_evaluator, python_code_evaluator, scilab_code_evaluator, java_code_evaluator +from exam.language_registry import set_registry, get_registry from exam.settings import SERVER_TIMEOUT + class RegistryTestCase(unittest.TestCase): def setUp(self): - self.registry_object = registry + set_registry() + self.registry_object = get_registry() def test_set_register(self): self.registry_object.register("demo_language", "demo_object") @@ -17,6 +20,9 @@ class RegistryTestCase(unittest.TestCase): cls = self.registry_object.get_class("demo_language") self.assertEquals(cls, "demo_object") + def tearDown(self): + self.registry_object = None + ############################################################################### class PythonEvaluationTestCases(unittest.TestCase): @@ -33,8 +39,8 @@ class PythonEvaluationTestCases(unittest.TestCase): def test_correct_answer(self): user_answer = "def add(a, b):\n\treturn a + b""" - get_class = evaluate_python_code.EvaluatePythonCode(self.test_case_data, self.language, user_answer, ref_code_path=None, in_dir=None) - result = get_class.run_code() + get_class = python_code_evaluator.PythonCodeEvaluator(self.test_case_data, self.language, user_answer, ref_code_path=None, in_dir=None) + result = get_class.code_evaluator() self.assertTrue(result.get("success")) self.assertEqual(result.get("error"), "Correct answer") @@ -46,8 +52,8 @@ class PythonEvaluationTestCases(unittest.TestCase): "pos_args": ["3", "2"], "kw_args": {} }] - get_class = evaluate_python_code.EvaluatePythonCode(self.test_case_data, self.language, user_answer, ref_code_path=None, in_dir=None) - result = get_class.run_code() + get_class = python_code_evaluator.PythonCodeEvaluator(self.test_case_data, self.language, user_answer, ref_code_path=None, in_dir=None) + result = get_class.code_evaluator() self.assertFalse(result.get("success")) self.assertEqual(result.get("error"), "AssertionError in: assert add(3, 2) == 5") @@ -59,8 +65,8 @@ class PythonEvaluationTestCases(unittest.TestCase): "pos_args": ["3", "2"], "kw_args": {} }] - get_class = evaluate_python_code.EvaluatePythonCode(self.test_case_data, self.language, user_answer, ref_code_path=None, in_dir=None) - result = get_class.run_code() + get_class = python_code_evaluator.PythonCodeEvaluator(self.test_case_data, self.language, user_answer, ref_code_path=None, in_dir=None) + result = get_class.code_evaluator() self.assertFalse(result.get("success")) self.assertEquals(result.get("error"), self.timeout_msg) @@ -78,24 +84,24 @@ class CEvaluationTestCases(unittest.TestCase): def test_correct_answer(self): user_answer = "int add(int a, int b)\n{return a+b;}" - get_class = evaluate_c_code.EvaluateCCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = c_cpp_code_evaluator.CCppCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertTrue(result.get("success")) self.assertEqual(result.get("error"), "Correct answer") def test_compilation_error(self): user_answer = "int add(int a, int b)\n{return a+b}" - get_class = evaluate_c_code.EvaluateCCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = c_cpp_code_evaluator.CCppCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertFalse(result.get("success")) self.assertTrue("Compilation Error" in result.get("error")) def test_infinite_loop(self): user_answer = "int add(int a, int b)\n{while(1>0){}}" - get_class = evaluate_c_code.EvaluateCCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = c_cpp_code_evaluator.CCppCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertFalse(result.get("success")) self.assertEquals(result.get("error"), self.timeout_msg) @@ -114,24 +120,24 @@ class CppEvaluationTestCases(unittest.TestCase): def test_correct_answer(self): user_answer = "int add(int a, int b)\n{return a+b;}" - get_class = evaluate_cpp_code.EvaluateCppCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = c_cpp_code_evaluator.CCppCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertTrue(result.get("success")) self.assertEqual(result.get("error"), "Correct answer") def test_compilation_error(self): user_answer = "int add(int a, int b)\n{return a+b}" - get_class = evaluate_cpp_code.EvaluateCppCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = c_cpp_code_evaluator.CCppCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertFalse(result.get("success")) self.assertTrue("Compilation Error" in result.get("error")) def test_infinite_loop(self): user_answer = "int add(int a, int b)\n{while(1>0){}}" - get_class = evaluate_cpp_code.EvaluateCppCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = c_cpp_code_evaluator.CCppCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertFalse(result.get("success")) self.assertEquals(result.get("error"), self.timeout_msg) @@ -149,25 +155,25 @@ class BashEvaluationTestCases(unittest.TestCase): def test_correct_answer(self): user_answer = "#!/bin/bash\n[[ $# -eq 2 ]] && echo $(( $1 + $2 )) && exit $(( $1 + $2 ))" - get_class = evaluate_bash_code.EvaluateBashCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = bash_code_evaluator.BashCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertTrue(result.get("success")) self.assertEqual(result.get("error"), "Correct answer") def test_error(self): user_answer = "#!/bin/bash\n[[ $# -eq 2 ]] && echo $(( $1 - $2 )) && exit $(( $1 - $2 ))" - get_class = evaluate_bash_code.EvaluateBashCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() - + get_class = bash_code_evaluator.BashCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() + self.assertFalse(result.get("success")) self.assertTrue("Error" in result.get("error")) def test_infinite_loop(self): user_answer = "#!/bin/bash\nwhile [ 1 ] ; do echo "" > /dev/null ; done" - get_class = evaluate_bash_code.EvaluateBashCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() - + get_class = bash_code_evaluator.BashCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() + self.assertFalse(result.get("success")) self.assertEquals(result.get("error"), self.timeout_msg) @@ -198,7 +204,7 @@ class JavaEvaluationTestCases(unittest.TestCase): self.assertFalse(result.get("success")) self.assertTrue("Error" in result.get("error")) - def test_infinite_loop(self): + def test_infinite_loop(self): user_answer = "class Test {\n\tint square_num(int a) {\n\t\twhile(0==0){\n\t\t}\n\t}\n}" get_class = evaluate_java_code.EvaluateJavaCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) result = get_class.run_code() @@ -217,24 +223,24 @@ class ScilabEvaluationTestCases(unittest.TestCase): def test_correct_answer(self): user_answer = "funcprot(0)\nfunction[c]=add(a,b)\n\tc=a+b;\nendfunction" - get_class = evaluate_scilab_code.EvaluateScilabCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = scilab_code_evaluator.ScilabCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertTrue(result.get("success")) self.assertEqual(result.get("error"), "Correct answer") def test_correct_answer_2(self): user_answer = "funcprot(0)\nfunction[c]=add(a,b)\n\tc=a-b;\nendfunction" - get_class = evaluate_scilab_code.EvaluateScilabCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = scilab_code_evaluator.ScilabCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertTrue(result.get("success")) self.assertEqual(result.get("error"), "Correct answer") def test_error(self): user_answer = "funcprot(0)\nfunction[c]=add(a,b)\n\t c=a+b;\ndis(\tendfunction" - get_class = evaluate_java_code.EvaluateJavaCode(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) - result = get_class.run_code() + get_class = scilab_code_evaluator.ScilabCodeEvaluator(self.test_case_data, self.language, user_answer, self.ref_code_path, self.in_dir) + result = get_class.code_evaluator() self.assertFalse(result.get("success")) self.assertTrue("Error" in result.get("error")) -- cgit