From 3ffdba6e587422a0f2955879d12e0b2aeac342e1 Mon Sep 17 00:00:00 2001 From: ankitjavalkar Date: Mon, 20 Apr 2015 21:42:02 +0530 Subject: Code review - code refactoring as per suggestion - Add subclasses for different languages - Create seperate modules for different languages - Dynamic selection of subclasses based on language used - Add testcases --- testapp/exam/code_server.py | 399 ++++---------------------------------------- 1 file changed, 35 insertions(+), 364 deletions(-) (limited to 'testapp/exam/code_server.py') diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py index 5cff7dc..ae68398 100755 --- a/testapp/exam/code_server.py +++ b/testapp/exam/code_server.py @@ -30,11 +30,12 @@ from multiprocessing import Process, Queue import subprocess import re import json +import importlib # Local imports. from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT -MY_DIR = abspath(dirname(__file__)) +MY_DIR = abspath(dirname(__file__)) def run_as_nobody(): """Runs the current process as nobody.""" @@ -78,13 +79,13 @@ def delete_signal_handler(): # `TestCode` class. ############################################################################### class TestCode(object): - """Evaluates and tests the code obtained from Code Server""" + """Tests the code obtained from Code Server""" def __init__(self, test_parameter, language, user_answer, ref_code_path=None, in_dir=None): msg = 'Code took more than %s seconds to run. You probably '\ 'have an infinite loop in your code.' % SERVER_TIMEOUT self.timeout_msg = msg self.test_parameter = test_parameter - self.language = language + self.language = language.lower() self.user_answer = user_answer self.ref_code_path = ref_code_path self.in_dir = in_dir @@ -110,17 +111,6 @@ class TestCode(object): A tuple: (success, error message). """ - success = False - prev_handler = None - - methods = {"python": 'evaluate_python_code', - "bash": 'evaluate_bash_code', - "C": "evaluate_c_cpp_java_code", - "C++": "evaluate_c_cpp_java_code", - "java": "evaluate_c_cpp_java_code", - "scilab": "evaluate_scilab_code", - } - get_method_based_on_lang = methods[self.language] self._change_dir(self.in_dir) # Add a new signal handler for the execution of this code. @@ -129,8 +119,7 @@ class TestCode(object): # Do whatever testing needed. try: - evaluate_code = getattr(self, get_method_based_on_lang) - success, err = evaluate_code() + success, err = self.evaluate_code() except TimeoutException: err = self.timeout_msg @@ -147,93 +136,17 @@ class TestCode(object): result = {'success': success, 'error': err} return result - def evaluate_python_code(self): - success = False - - try: - tb = None - test_code = self._create_test_case() - submitted = compile(self.user_answer, '', mode='exec') - g = {} - exec submitted in g - _tests = compile(test_code, '', mode='exec') - exec _tests in g - except AssertionError: - type, value, tb = sys.exc_info() - info = traceback.extract_tb(tb) - fname, lineno, func, text = info[-1] - text = str(test_code).splitlines()[lineno-1] - err = "{0} {1} in: {2}".format(type.__name__, str(value), text) - else: - success = True - err = 'Correct answer' - - del tb - return success, err - - def evaluate_c_cpp_java_code(self): - submit_path = self._create_submit_code_file() - ref_path, test_case_path = self._set_test_code_file_path() - success = False - - success, err = self._check_code(ref_path, submit_path) - - # Delete the created file. - os.remove(submit_path) - - return success, err - - def evaluate_scilab_code(self): - submit_path = self._create_submit_code_file() - ref_path, test_case_path = self._set_test_code_file_path() - success = False - - cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) - cmd += ' | timeout 8 scilab-cli -nb' - ret = self._run_command(cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - - # Get only the error. - stderr = self._get_error(stdout) - if stderr is None: - # Clean output - stdout = self._strip_output(stdout) - if proc.returncode == 5: - success, err = True, "Correct answer" - else: - err = add_err + stdout - else: - err = add_err + stderr - - # Delete the created file. - os.remove(submit_path) - - return success, err - - def evaluate_bash_code(self): - submit_path = self._create_submit_code_file() - ref_path, test_case_path = self._set_test_code_file_path() - success = False - - success, err = self.check_bash_script(ref_path, submit_path, - test_case_path) - - # Delete the created file. - os.remove(submit_path) + def evaluate_code(self): + pass - return success, err - - def _create_submit_code_file(self): + def _create_submit_code_file(self, file_name): """ Write the code (`answer`) to a file and set the file path""" - user_answer_file = {'C': 'submit.c', 'java': 'Test.java', - 'scilab': 'function.sci', 'C++': 'submitstd.cpp', - 'bash': 'submit.sh'} + # user_answer_file = {'c': 'submit.c', 'java': 'Test.java', + # 'scilab': 'function.sci', 'cpp': 'submitstd.cpp', + # 'bash': 'submit.sh'} - # File extension depending on the question language - submit_f = open(user_answer_file.get(self.language), 'w') + # File name/extension depending on the question language + submit_f = open(file_name, 'w') submit_f.write(self.user_answer.lstrip()) submit_f.close() submit_path = abspath(submit_f.name) @@ -242,8 +155,13 @@ class TestCode(object): return submit_path - def _set_test_code_file_path(self): - ref_path, test_case_path = self.ref_code_path.split(',') + def _set_exec(self, fname): + os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR + | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP + | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) + + def _set_test_code_file_path(self, ref_path=None, test_case_path=None): + # ref_path, test_case_path = self.ref_code_path.split(',') if ref_path and not ref_path.startswith('/'): ref_path = join(MY_DIR, ref_path) @@ -252,202 +170,6 @@ class TestCode(object): return ref_path, test_case_path - def _check_code(self, ref_code_path, submit_code_path): - """ Function validates student code using instructor code as - reference.The first argument ref_code_path, is the path to - instructor code, it is assumed to have executable permission. - The second argument submit_code_path, is the path to the student - code, it is assumed to have executable permission. - - Returns - -------- - - returns (True, "Correct answer") : If the student function returns - expected output when called by reference code. - - returns (False, error_msg): If the student function fails to return - expected output when called by reference code. - - Returns (False, error_msg): If mandatory arguments are not files or - if the required permissions are not given to the file(s). - - """ - - language_dependent_path = { - 'c_user_output_path': os.getcwd() + '/output', - 'c_ref_output_path': os.getcwd() + '/executable', - 'java_student_directory': os.getcwd() + '/', - # 'java_student_file_name': 'Test', - 'java_ref_file_name': (ref_code_path.split('/')[-1]).split('.')[0], - } - - language_dependent_var = { - 'C': {'compile_command': 'g++ {0} -c -o {1}'.format(submit_code_path, - language_dependent_path.get('c_user_output_path')), - 'compile_main': 'g++ {0} {1} -o {2}'.format(ref_code_path, - language_dependent_path.get('c_user_output_path'), - language_dependent_path.get('c_ref_output_path')), - 'run_command_args': [language_dependent_path.get('c_ref_output_path')], - 'remove_user_output': language_dependent_path.get('c_user_output_path'), - 'remove_ref_output': language_dependent_path.get('c_ref_output_path') - }, - 'java':{'compile_command': 'javac {0}'.format(submit_code_path), - 'compile_main': 'javac {0} -classpath {1} -d {2}'.format(ref_code_path, - language_dependent_path.get('java_student_directory'), - language_dependent_path.get('java_student_directory')), - 'run_command_args': "java -cp {0} {1}".format( - language_dependent_path.get('java_student_directory'), - language_dependent_path.get('java_ref_file_name')), - 'remove_user_output': "%s%s.class".format( - language_dependent_path.get('java_student_directory'), - 'Test'), - 'remove_ref_output': "%s%s.class".format( - language_dependent_path.get('java_student_directory'), - language_dependent_path.get('java_ref_file_name')), - } - } - - if not isfile(ref_code_path): - return False, "No file at %s or Incorrect path" % ref_code_path - if not isfile(submit_code_path): - return False, 'No file at %s or Incorrect path' % submit_code_path - - success = False - # output_path = os.getcwd() + '/output' - compile_command = language_dependent_var.get(self.language).get('compile_command') - ret = self._compile_command(compile_command) - proc, stdnt_stderr = ret - if self.language == "java": - stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr) - - # Only if compilation is successful, the program is executed - # And tested with testcases - if stdnt_stderr == '': - compile_main = language_dependent_var.get(self.language).get('compile_main') - ret = self._compile_command(compile_main) - proc, main_err = ret - if self.language == "java": - main_err = self._remove_null_substitute_char(main_err) - - if main_err == '': - run_command_args = language_dependent_var.get(self.language).get('run_command_args') - ret = self._run_command(run_command_args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - if proc.returncode == 0: - success, err = True, "Correct answer" - else: - err = stdout + "\n" + stderr - os.remove(language_dependent_var.get(self.language).get('remove_ref_output')) - else: - err = "Error:" - try: - error_lines = main_err.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + main_err - os.remove(language_dependent_var.get(self.language).get('remove_user_output')) - else: - err = "Compilation Error:" - try: - error_lines = stdnt_stderr.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + stdnt_stderr - - return success, err - - def check_bash_script(self, ref_path, submit_path, - test_case_path=None): - """ Function validates student script using instructor script as - reference. Test cases can optionally be provided. The first argument - ref_path, is the path to instructor script, it is assumed to - have executable permission. The second argument submit_path, is - the path to the student script, it is assumed to have executable - permission. The Third optional argument is the path to test the - scripts. Each line in this file is a test case and each test case is - passed to the script as standard arguments. - - Returns - -------- - - returns (True, "Correct answer") : If the student script passes all - test cases/have same output, when compared to the instructor script - - returns (False, error_msg): If the student script fails a single - test/have dissimilar output, when compared to the instructor script. - - Returns (False, error_msg): If mandatory arguments are not files or if - the required permissions are not given to the file(s). - - """ - if not isfile(ref_path): - return False, "No file at %s or Incorrect path" % ref_path - if not isfile(submit_path): - return False, "No file at %s or Incorrect path" % submit_path - if not os.access(ref_path, os.X_OK): - return False, "Script %s is not executable" % ref_path - if not os.access(submit_path, os.X_OK): - return False, "Script %s is not executable" % submit_path - - if test_case_path is None or "": - ret = self._run_command(ref_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - ret = self._run_command(submit_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - if inst_stdout == stdnt_stdout: - return True, "Correct answer" - else: - err = "Error: expected %s, got %s" % (inst_stderr, - stdnt_stderr) - return False, err - else: - if not isfile(test_case_path): - return False, "No test case at %s" % test_case_path - if not os.access(ref_path, os.R_OK): - return False, "Test script %s, not readable" % test_case_path - valid_answer = True # We initially make it one, so that we can - # stop once a test case fails - loop_count = 0 # Loop count has to be greater than or - # equal to one. - # Useful for caching things like empty - # test files,etc. - test_cases = open(test_case_path).readlines() - num_lines = len(test_cases) - for test_case in test_cases: - loop_count += 1 - if valid_answer: - args = [ref_path] + [x for x in test_case.split()] - ret = self._run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - args = [submit_path]+[x for x in test_case.split()] - ret = self._run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - valid_answer = inst_stdout == stdnt_stdout - if valid_answer and (num_lines == loop_count): - return True, "Correct answer" - else: - err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr, - stdnt_stdout+stdnt_stderr) - return False, err - def _run_command(self, cmd_args, *args, **kw): """Run a command in a subprocess while blocking, the process is killed if it takes more than 2 seconds to run. Return the Popen object, the @@ -481,70 +203,6 @@ class TestCode(object): raise return proc_compile, err - def _remove_null_substitute_char(self, string): - """Returns a string without any null and substitute characters""" - stripped = "" - for c in string: - if ord(c) is not 26 and ord(c) is not 0: - stripped = stripped + c - return ''.join(stripped) - - def _remove_scilab_exit(self, string): - """ - Removes exit, quit and abort from the scilab code - """ - new_string = "" - i=0 - for line in string.splitlines(): - new_line = re.sub(r"exit.*$","",line) - new_line = re.sub(r"quit.*$","",new_line) - new_line = re.sub(r"abort.*$","",new_line) - if line != new_line: - i=i+1 - new_string = new_string +'\n'+ new_line - return new_string, i - - def _get_error(self, string): - """ - Fetches only the error from the string. - Returns None if no error. - """ - obj = re.search("!.+\n.+",string); - if obj: - return obj.group() - return None - - def _strip_output(self, out): - """ - Cleans whitespace from the output - """ - strip_out = "Message" - for l in out.split('\n'): - if l.strip(): - strip_out = strip_out+"\n"+l.strip() - return strip_out - - def _set_exec(self, fname): - os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR - | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP - | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) - - def _create_test_case(self): - """ - Create assert based test cases in python - """ - test_code = "" - for test_case in self.test_parameter: - pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \ - else "" - kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \ - if test_case.get('kw_args') else "" - args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args - tcode = "assert {0}({1}) == {2}" \ - .format(test_case.get('func_name'), args, test_case.get('expected_answer')) - test_code += tcode + "\n" - return test_code - def _change_dir(self, in_dir): if in_dir is not None and isdir(in_dir): os.chdir(in_dir) @@ -569,13 +227,26 @@ class CodeServer(object): user_answer = info_parameter.get("user_answer") ref_code_path = info_parameter.get("ref_code_path") - tc = TestCode(test_parameter, language, user_answer, ref_code_path, in_dir) - result = tc.run_code() + eval_module_name = "evaluate_{0}".format(language.lower()) + eval_class_name = "Evaluate{0}".format(language.capitalize()) + + get_class = self._sub_class_factory(eval_module_name, eval_class_name) + + test_code_class = get_class(test_parameter, language, user_answer, ref_code_path, in_dir) + result = test_code_class.run_code() # Put us back into the server pool queue since we are free now. self.queue.put(self.port) return json.dumps(result) + def _sub_class_factory(self, module_name, class_name): + # load the module, will raise ImportError if module cannot be loaded + get_module = importlib.import_module(module_name) + # get the class, will raise AttributeError if class cannot be found + get_class = getattr(get_module, class_name) + + return get_class + def run(self): """Run XMLRPC server, serving our methods.""" server = SimpleXMLRPCServer(("localhost", self.port)) -- cgit