diff options
author | ankitjavalkar | 2015-04-20 21:38:11 +0530 |
---|---|---|
committer | ankitjavalkar | 2015-04-26 19:46:01 +0530 |
commit | cbeffec80d30fe2a9048644f5b0345f797479c92 (patch) | |
tree | c544330859e16293fcae0fb71fab238fc4257520 /testapp | |
parent | 3d998f9d467ccfe795448dd12b33eac52f269ed4 (diff) | |
download | online_test-cbeffec80d30fe2a9048644f5b0345f797479c92.tar.gz online_test-cbeffec80d30fe2a9048644f5b0345f797479c92.tar.bz2 online_test-cbeffec80d30fe2a9048644f5b0345f797479c92.zip |
Code review - changes as per code review discussion
- Further commonify and simplify code_server, fix bugs
Diffstat (limited to 'testapp')
-rwxr-xr-x | testapp/exam/code_server.py | 282 | ||||
-rw-r--r-- | testapp/exam/models.py | 14 | ||||
-rw-r--r-- | testapp/exam/xmlrpc_clients.py | 7 |
3 files changed, 150 insertions, 153 deletions
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py index 2f1607c..5cff7dc 100755 --- a/testapp/exam/code_server.py +++ b/testapp/exam/code_server.py @@ -80,6 +80,9 @@ def delete_signal_handler(): class TestCode(object): """Evaluates and tests the code obtained from Code Server""" def __init__(self, test_parameter, language, user_answer, ref_code_path=None, in_dir=None): + msg = 'Code took more than %s seconds to run. You probably '\ + 'have an infinite loop in your code.' % SERVER_TIMEOUT + self.timeout_msg = msg self.test_parameter = test_parameter self.language = language self.user_answer = user_answer @@ -110,136 +113,144 @@ class TestCode(object): success = False prev_handler = None + methods = {"python": 'evaluate_python_code', + "bash": 'evaluate_bash_code', + "C": "evaluate_c_cpp_java_code", + "C++": "evaluate_c_cpp_java_code", + "java": "evaluate_c_cpp_java_code", + "scilab": "evaluate_scilab_code", + } + get_method_based_on_lang = methods[self.language] self._change_dir(self.in_dir) - if self.language == "python": + # Add a new signal handler for the execution of this code. + prev_handler = create_signal_handler() + success = False + + # Do whatever testing needed. + try: + evaluate_code = getattr(self, get_method_based_on_lang) + success, err = evaluate_code() + + except TimeoutException: + err = self.timeout_msg + except: + type, value = sys.exc_info()[:2] + err = "Error: {0}".format(repr(value)) + finally: + # Set back any original signal handler. + set_original_signal_handler(prev_handler) + + # Cancel the signal + delete_signal_handler() + + result = {'success': success, 'error': err} + return result + + def evaluate_python_code(self): + success = False + + try: tb = None - test_code = self.create_test_case() - # Add a new signal handler for the execution of this code. - prev_handler = create_signal_handler() + test_code = self._create_test_case() + submitted = compile(self.user_answer, '<string>', mode='exec') + g = {} + exec submitted in g + _tests = compile(test_code, '<string>', mode='exec') + exec _tests in g + except AssertionError: + type, value, tb = sys.exc_info() + info = traceback.extract_tb(tb) + fname, lineno, func, text = info[-1] + text = str(test_code).splitlines()[lineno-1] + err = "{0} {1} in: {2}".format(type.__name__, str(value), text) + else: + success = True + err = 'Correct answer' - try: - submitted = compile(self.user_answer, '<string>', mode='exec') - g = {} - exec submitted in g - _tests = compile(test_code, '<string>', mode='exec') - exec _tests in g - except TimeoutException: - err = self.timeout_msg - except AssertionError: - type, value, tb = sys.exc_info() - info = traceback.extract_tb(tb) - fname, lineno, func, text = info[-1] - text = str(test_code).splitlines()[lineno-1] - err = "{0} {1} in: {2}".format(type.__name__, str(value), text) - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - else: - success = True - err = 'Correct answer' - finally: - del tb - # Set back any original signal handler. - set_original_signal_handler(prev_handler) + del tb + return success, err + + def evaluate_c_cpp_java_code(self): + submit_path = self._create_submit_code_file() + ref_path, test_case_path = self._set_test_code_file_path() + success = False + + success, err = self._check_code(ref_path, submit_path) + + # Delete the created file. + os.remove(submit_path) + + return success, err - # Cancel the signal - delete_signal_handler() + def evaluate_scilab_code(self): + submit_path = self._create_submit_code_file() + ref_path, test_case_path = self._set_test_code_file_path() + success = False + cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) + cmd += ' | timeout 8 scilab-cli -nb' + ret = self._run_command(cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc, stdout, stderr = ret + + # Get only the error. + stderr = self._get_error(stdout) + if stderr is None: + # Clean output + stdout = self._strip_output(stdout) + if proc.returncode == 5: + success, err = True, "Correct answer" + else: + err = add_err + stdout else: - user_answer_file = {'C': 'submit.c', 'java': 'Test.java', 'scilab': 'function.sci', - 'C++': 'submitstd.cpp', 'bash': 'submit.sh'} - - # File extension depending on the question language - submit_f = open(user_answer_file.get(self.language), 'w') - submit_f.write(self.user_answer.lstrip()) - submit_f.close() - submit_path = abspath(submit_f.name) - if self.language == "bash": - self._set_exec(submit_path) - - path_list = self.ref_code_path.split(',') - ref_path = path_list[0].strip() if path_list[0] else "" - test_case_path = path_list[1].strip() if path_list[1] else "" - if ref_path and not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - if test_case_path and not test_case_path.startswith('/'): - test_case_path = join(MY_DIR, test_case_path) - - if self.language in ["C++", "C", "java"]: - # Add a new signal handler for the execution of this code. - prev_handler = create_signal_handler() - - # Do whatever testing needed. - try: - success, err = self._check_code(ref_path, submit_path) - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - set_original_signal_handler(prev_handler) + err = add_err + stderr - elif self.language == "scilab": - # Add a new signal handler for the execution of this code. - prev_handler = create_signal_handler() + # Delete the created file. + os.remove(submit_path) - # Do whatever testing needed. - try: - cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) - cmd += ' | timeout 8 scilab-cli -nb' - ret = self._run_command(cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - - # Get only the error. - stderr = self._get_error(stdout) - if stderr is None: - # Clean output - stdout = self._strip_output(stdout) - if proc.returncode == 5: - success, err = True, "Correct answer" - else: - err = add_err + stdout - else: - err = add_err + stderr - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - set_original_signal_handler(prev_handler) + return success, err - elif self.language == "bash": - # Add a new signal handler for the execution of this code. - prev_handler = create_signal_handler() + def evaluate_bash_code(self): + submit_path = self._create_submit_code_file() + ref_path, test_case_path = self._set_test_code_file_path() + success = False - try: - success, err = self.check_bash_script(ref_path, submit_path, - test_case_path) - except TimeoutException: - err = self.timeout_msg - except: - type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - set_original_signal_handler(prev_handler) + success, err = self.check_bash_script(ref_path, submit_path, + test_case_path) - # Delete the created file. - os.remove(submit_path) + # Delete the created file. + os.remove(submit_path) - # Cancel the signal - delete_signal_handler() + return success, err - result = {'success': success, 'error': err} - return result + def _create_submit_code_file(self): + """ Write the code (`answer`) to a file and set the file path""" + user_answer_file = {'C': 'submit.c', 'java': 'Test.java', + 'scilab': 'function.sci', 'C++': 'submitstd.cpp', + 'bash': 'submit.sh'} + + # File extension depending on the question language + submit_f = open(user_answer_file.get(self.language), 'w') + submit_f.write(self.user_answer.lstrip()) + submit_f.close() + submit_path = abspath(submit_f.name) + if self.language == "bash": + self._set_exec(submit_path) + + return submit_path + + def _set_test_code_file_path(self): + ref_path, test_case_path = self.ref_code_path.split(',') + + if ref_path and not ref_path.startswith('/'): + ref_path = join(MY_DIR, ref_path) + if test_case_path and not test_case_path.startswith('/'): + test_case_path = join(MY_DIR, test_case_path) + + return ref_path, test_case_path def _check_code(self, ref_code_path, submit_code_path): """ Function validates student code using instructor code as @@ -303,7 +314,7 @@ class TestCode(object): success = False # output_path = os.getcwd() + '/output' - compile_command = language_dependent_var.get(self.language).get('compile_command') # "g++ %s -c -o %s" % (submit_code_path, output_path) + compile_command = language_dependent_var.get(self.language).get('compile_command') ret = self._compile_command(compile_command) proc, stdnt_stderr = ret if self.language == "java": @@ -312,7 +323,7 @@ class TestCode(object): # Only if compilation is successful, the program is executed # And tested with testcases if stdnt_stderr == '': - compile_main = language_dependent_var.get(self.language).get('compile_main') # "g++ %s %s -o %s" % (ref_code_path, output_path, executable) + compile_main = language_dependent_var.get(self.language).get('compile_main') ret = self._compile_command(compile_main) proc, main_err = ret if self.language == "java": @@ -328,7 +339,7 @@ class TestCode(object): success, err = True, "Correct answer" else: err = stdout + "\n" + stderr - os.remove(language_dependent_var.get(self.language).get('remove_ref_output')) # os.remove(executable) + os.remove(language_dependent_var.get(self.language).get('remove_ref_output')) else: err = "Error:" try: @@ -340,7 +351,7 @@ class TestCode(object): err = err + "\n" + e except: err = err + "\n" + main_err - os.remove(language_dependent_var.get(self.language).get('remove_user_output')) # os.remove(output_path) + os.remove(language_dependent_var.get(self.language).get('remove_user_output')) else: err = "Compilation Error:" try: @@ -353,8 +364,7 @@ class TestCode(object): except: err = err + "\n" + stdnt_stderr - result = {'success': success, 'error': err} - return result + return success, err def check_bash_script(self, ref_path, submit_path, test_case_path=None): @@ -519,22 +529,21 @@ class TestCode(object): | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) - def create_test_case(self): + def _create_test_case(self): """ Create assert based test cases in python """ - if self.language == "python": - test_code = "" - for test_case in self.test_parameter: - pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \ - else "" - kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \ - if test_case.get('kw_args') else "" - args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args - tcode = "assert {0}({1}) == {2}" \ - .format(test_case.get('func_name'), args, test_case.get('expected_answer')) - test_code += tcode + "\n" - return test_code + test_code = "" + for test_case in self.test_parameter: + pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \ + else "" + kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \ + if test_case.get('kw_args') else "" + args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args + tcode = "assert {0}({1}) == {2}" \ + .format(test_case.get('func_name'), args, test_case.get('expected_answer')) + test_code += tcode + "\n" + return test_code def _change_dir(self, in_dir): if in_dir is not None and isdir(in_dir): @@ -551,9 +560,6 @@ class CodeServer(object): def __init__(self, port, queue): self.port = port self.queue = queue - msg = 'Code took more than %s seconds to run. You probably '\ - 'have an infinite loop in your code.' % SERVER_TIMEOUT - self.timeout_msg = msg def checker(self, info_parameter, in_dir=None): """Calls the TestCode Class to test the current code""" diff --git a/testapp/exam/models.py b/testapp/exam/models.py index 706d864..1e7db2b 100644 --- a/testapp/exam/models.py +++ b/testapp/exam/models.py @@ -64,6 +64,7 @@ class Question(models.Model): solution = models.TextField(blank=True) # Test cases file paths (comma seperated for reference code path and test case code path) + # Applicable for CPP, C, Java and Scilab ref_code_path = models.TextField(blank=True) # Any multiple choice options. Place one option per line. @@ -112,11 +113,11 @@ class Question(models.Model): parameter_dict['pos_args'] = pos_args_list test_case_parameter.append(parameter_dict) - info_parameter['language'] = self.language - info_parameter['id'] = self.id - info_parameter['user_answer'] = user_answer - info_parameter['test_parameter'] = test_case_parameter - info_parameter['ref_code_path'] = self.ref_code_path + info_parameter['language'] = self.language + info_parameter['id'] = self.id + info_parameter['user_answer'] = user_answer + info_parameter['test_parameter'] = test_case_parameter + info_parameter['ref_code_path'] = self.ref_code_path return json.dumps(info_parameter) @@ -451,6 +452,3 @@ class TestCase(models.Model): # Test case Expected answer in list form expected_answer = models.TextField(blank=True, null = True) - - # # Test case path to system test code applicable for CPP, C, Java and Scilab - # ref_code_path = models.TextField(blank=True, null = True) diff --git a/testapp/exam/xmlrpc_clients.py b/testapp/exam/xmlrpc_clients.py index 2b0f0fa..5791dc6 100644 --- a/testapp/exam/xmlrpc_clients.py +++ b/testapp/exam/xmlrpc_clients.py @@ -21,13 +21,6 @@ class CodeServerProxy(object): def __init__(self): pool_url = 'http://localhost:%d' % (SERVER_POOL_PORT) self.pool_server = ServerProxy(pool_url) - self.methods = {"python": 'run_python_code', - "bash": 'run_bash_code', - "C": "run_c_code", - "C++": "run_cplus_code", - "java": "run_java_code", - "scilab": "run_scilab_code", - } def run_code(self, info_parameter, user_dir): """Tests given code (`answer`) with the `test_code` supplied. If the |