summaryrefslogtreecommitdiff
path: root/testapp
diff options
context:
space:
mode:
authorankitjavalkar2015-04-20 21:38:11 +0530
committerankitjavalkar2015-04-26 19:46:01 +0530
commitcbeffec80d30fe2a9048644f5b0345f797479c92 (patch)
treec544330859e16293fcae0fb71fab238fc4257520 /testapp
parent3d998f9d467ccfe795448dd12b33eac52f269ed4 (diff)
downloadonline_test-cbeffec80d30fe2a9048644f5b0345f797479c92.tar.gz
online_test-cbeffec80d30fe2a9048644f5b0345f797479c92.tar.bz2
online_test-cbeffec80d30fe2a9048644f5b0345f797479c92.zip
Code review - changes as per code review discussion
- Further commonify and simplify code_server, fix bugs
Diffstat (limited to 'testapp')
-rwxr-xr-xtestapp/exam/code_server.py282
-rw-r--r--testapp/exam/models.py14
-rw-r--r--testapp/exam/xmlrpc_clients.py7
3 files changed, 150 insertions, 153 deletions
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py
index 2f1607c..5cff7dc 100755
--- a/testapp/exam/code_server.py
+++ b/testapp/exam/code_server.py
@@ -80,6 +80,9 @@ def delete_signal_handler():
class TestCode(object):
"""Evaluates and tests the code obtained from Code Server"""
def __init__(self, test_parameter, language, user_answer, ref_code_path=None, in_dir=None):
+ msg = 'Code took more than %s seconds to run. You probably '\
+ 'have an infinite loop in your code.' % SERVER_TIMEOUT
+ self.timeout_msg = msg
self.test_parameter = test_parameter
self.language = language
self.user_answer = user_answer
@@ -110,136 +113,144 @@ class TestCode(object):
success = False
prev_handler = None
+ methods = {"python": 'evaluate_python_code',
+ "bash": 'evaluate_bash_code',
+ "C": "evaluate_c_cpp_java_code",
+ "C++": "evaluate_c_cpp_java_code",
+ "java": "evaluate_c_cpp_java_code",
+ "scilab": "evaluate_scilab_code",
+ }
+ get_method_based_on_lang = methods[self.language]
self._change_dir(self.in_dir)
- if self.language == "python":
+ # Add a new signal handler for the execution of this code.
+ prev_handler = create_signal_handler()
+ success = False
+
+ # Do whatever testing needed.
+ try:
+ evaluate_code = getattr(self, get_method_based_on_lang)
+ success, err = evaluate_code()
+
+ except TimeoutException:
+ err = self.timeout_msg
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ finally:
+ # Set back any original signal handler.
+ set_original_signal_handler(prev_handler)
+
+ # Cancel the signal
+ delete_signal_handler()
+
+ result = {'success': success, 'error': err}
+ return result
+
+ def evaluate_python_code(self):
+ success = False
+
+ try:
tb = None
- test_code = self.create_test_case()
- # Add a new signal handler for the execution of this code.
- prev_handler = create_signal_handler()
+ test_code = self._create_test_case()
+ submitted = compile(self.user_answer, '<string>', mode='exec')
+ g = {}
+ exec submitted in g
+ _tests = compile(test_code, '<string>', mode='exec')
+ exec _tests in g
+ except AssertionError:
+ type, value, tb = sys.exc_info()
+ info = traceback.extract_tb(tb)
+ fname, lineno, func, text = info[-1]
+ text = str(test_code).splitlines()[lineno-1]
+ err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+ else:
+ success = True
+ err = 'Correct answer'
- try:
- submitted = compile(self.user_answer, '<string>', mode='exec')
- g = {}
- exec submitted in g
- _tests = compile(test_code, '<string>', mode='exec')
- exec _tests in g
- except TimeoutException:
- err = self.timeout_msg
- except AssertionError:
- type, value, tb = sys.exc_info()
- info = traceback.extract_tb(tb)
- fname, lineno, func, text = info[-1]
- text = str(test_code).splitlines()[lineno-1]
- err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- else:
- success = True
- err = 'Correct answer'
- finally:
- del tb
- # Set back any original signal handler.
- set_original_signal_handler(prev_handler)
+ del tb
+ return success, err
+
+ def evaluate_c_cpp_java_code(self):
+ submit_path = self._create_submit_code_file()
+ ref_path, test_case_path = self._set_test_code_file_path()
+ success = False
+
+ success, err = self._check_code(ref_path, submit_path)
+
+ # Delete the created file.
+ os.remove(submit_path)
+
+ return success, err
- # Cancel the signal
- delete_signal_handler()
+ def evaluate_scilab_code(self):
+ submit_path = self._create_submit_code_file()
+ ref_path, test_case_path = self._set_test_code_file_path()
+ success = False
+ cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
+ cmd += ' | timeout 8 scilab-cli -nb'
+ ret = self._run_command(cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdout, stderr = ret
+
+ # Get only the error.
+ stderr = self._get_error(stdout)
+ if stderr is None:
+ # Clean output
+ stdout = self._strip_output(stdout)
+ if proc.returncode == 5:
+ success, err = True, "Correct answer"
+ else:
+ err = add_err + stdout
else:
- user_answer_file = {'C': 'submit.c', 'java': 'Test.java', 'scilab': 'function.sci',
- 'C++': 'submitstd.cpp', 'bash': 'submit.sh'}
-
- # File extension depending on the question language
- submit_f = open(user_answer_file.get(self.language), 'w')
- submit_f.write(self.user_answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
- if self.language == "bash":
- self._set_exec(submit_path)
-
- path_list = self.ref_code_path.split(',')
- ref_path = path_list[0].strip() if path_list[0] else ""
- test_case_path = path_list[1].strip() if path_list[1] else ""
- if ref_path and not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
- if test_case_path and not test_case_path.startswith('/'):
- test_case_path = join(MY_DIR, test_case_path)
-
- if self.language in ["C++", "C", "java"]:
- # Add a new signal handler for the execution of this code.
- prev_handler = create_signal_handler()
-
- # Do whatever testing needed.
- try:
- success, err = self._check_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- set_original_signal_handler(prev_handler)
+ err = add_err + stderr
- elif self.language == "scilab":
- # Add a new signal handler for the execution of this code.
- prev_handler = create_signal_handler()
+ # Delete the created file.
+ os.remove(submit_path)
- # Do whatever testing needed.
- try:
- cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
- cmd += ' | timeout 8 scilab-cli -nb'
- ret = self._run_command(cmd,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
-
- # Get only the error.
- stderr = self._get_error(stdout)
- if stderr is None:
- # Clean output
- stdout = self._strip_output(stdout)
- if proc.returncode == 5:
- success, err = True, "Correct answer"
- else:
- err = add_err + stdout
- else:
- err = add_err + stderr
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- set_original_signal_handler(prev_handler)
+ return success, err
- elif self.language == "bash":
- # Add a new signal handler for the execution of this code.
- prev_handler = create_signal_handler()
+ def evaluate_bash_code(self):
+ submit_path = self._create_submit_code_file()
+ ref_path, test_case_path = self._set_test_code_file_path()
+ success = False
- try:
- success, err = self.check_bash_script(ref_path, submit_path,
- test_case_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- set_original_signal_handler(prev_handler)
+ success, err = self.check_bash_script(ref_path, submit_path,
+ test_case_path)
- # Delete the created file.
- os.remove(submit_path)
+ # Delete the created file.
+ os.remove(submit_path)
- # Cancel the signal
- delete_signal_handler()
+ return success, err
- result = {'success': success, 'error': err}
- return result
+ def _create_submit_code_file(self):
+ """ Write the code (`answer`) to a file and set the file path"""
+ user_answer_file = {'C': 'submit.c', 'java': 'Test.java',
+ 'scilab': 'function.sci', 'C++': 'submitstd.cpp',
+ 'bash': 'submit.sh'}
+
+ # File extension depending on the question language
+ submit_f = open(user_answer_file.get(self.language), 'w')
+ submit_f.write(self.user_answer.lstrip())
+ submit_f.close()
+ submit_path = abspath(submit_f.name)
+ if self.language == "bash":
+ self._set_exec(submit_path)
+
+ return submit_path
+
+ def _set_test_code_file_path(self):
+ ref_path, test_case_path = self.ref_code_path.split(',')
+
+ if ref_path and not ref_path.startswith('/'):
+ ref_path = join(MY_DIR, ref_path)
+ if test_case_path and not test_case_path.startswith('/'):
+ test_case_path = join(MY_DIR, test_case_path)
+
+ return ref_path, test_case_path
def _check_code(self, ref_code_path, submit_code_path):
""" Function validates student code using instructor code as
@@ -303,7 +314,7 @@ class TestCode(object):
success = False
# output_path = os.getcwd() + '/output'
- compile_command = language_dependent_var.get(self.language).get('compile_command') # "g++ %s -c -o %s" % (submit_code_path, output_path)
+ compile_command = language_dependent_var.get(self.language).get('compile_command')
ret = self._compile_command(compile_command)
proc, stdnt_stderr = ret
if self.language == "java":
@@ -312,7 +323,7 @@ class TestCode(object):
# Only if compilation is successful, the program is executed
# And tested with testcases
if stdnt_stderr == '':
- compile_main = language_dependent_var.get(self.language).get('compile_main') # "g++ %s %s -o %s" % (ref_code_path, output_path, executable)
+ compile_main = language_dependent_var.get(self.language).get('compile_main')
ret = self._compile_command(compile_main)
proc, main_err = ret
if self.language == "java":
@@ -328,7 +339,7 @@ class TestCode(object):
success, err = True, "Correct answer"
else:
err = stdout + "\n" + stderr
- os.remove(language_dependent_var.get(self.language).get('remove_ref_output')) # os.remove(executable)
+ os.remove(language_dependent_var.get(self.language).get('remove_ref_output'))
else:
err = "Error:"
try:
@@ -340,7 +351,7 @@ class TestCode(object):
err = err + "\n" + e
except:
err = err + "\n" + main_err
- os.remove(language_dependent_var.get(self.language).get('remove_user_output')) # os.remove(output_path)
+ os.remove(language_dependent_var.get(self.language).get('remove_user_output'))
else:
err = "Compilation Error:"
try:
@@ -353,8 +364,7 @@ class TestCode(object):
except:
err = err + "\n" + stdnt_stderr
- result = {'success': success, 'error': err}
- return result
+ return success, err
def check_bash_script(self, ref_path, submit_path,
test_case_path=None):
@@ -519,22 +529,21 @@ class TestCode(object):
| stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
| stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
- def create_test_case(self):
+ def _create_test_case(self):
"""
Create assert based test cases in python
"""
- if self.language == "python":
- test_code = ""
- for test_case in self.test_parameter:
- pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
- else ""
- kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
- if test_case.get('kw_args') else ""
- args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
- tcode = "assert {0}({1}) == {2}" \
- .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
- test_code += tcode + "\n"
- return test_code
+ test_code = ""
+ for test_case in self.test_parameter:
+ pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
+ else ""
+ kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
+ if test_case.get('kw_args') else ""
+ args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
+ tcode = "assert {0}({1}) == {2}" \
+ .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
+ test_code += tcode + "\n"
+ return test_code
def _change_dir(self, in_dir):
if in_dir is not None and isdir(in_dir):
@@ -551,9 +560,6 @@ class CodeServer(object):
def __init__(self, port, queue):
self.port = port
self.queue = queue
- msg = 'Code took more than %s seconds to run. You probably '\
- 'have an infinite loop in your code.' % SERVER_TIMEOUT
- self.timeout_msg = msg
def checker(self, info_parameter, in_dir=None):
"""Calls the TestCode Class to test the current code"""
diff --git a/testapp/exam/models.py b/testapp/exam/models.py
index 706d864..1e7db2b 100644
--- a/testapp/exam/models.py
+++ b/testapp/exam/models.py
@@ -64,6 +64,7 @@ class Question(models.Model):
solution = models.TextField(blank=True)
# Test cases file paths (comma seperated for reference code path and test case code path)
+ # Applicable for CPP, C, Java and Scilab
ref_code_path = models.TextField(blank=True)
# Any multiple choice options. Place one option per line.
@@ -112,11 +113,11 @@ class Question(models.Model):
parameter_dict['pos_args'] = pos_args_list
test_case_parameter.append(parameter_dict)
- info_parameter['language'] = self.language
- info_parameter['id'] = self.id
- info_parameter['user_answer'] = user_answer
- info_parameter['test_parameter'] = test_case_parameter
- info_parameter['ref_code_path'] = self.ref_code_path
+ info_parameter['language'] = self.language
+ info_parameter['id'] = self.id
+ info_parameter['user_answer'] = user_answer
+ info_parameter['test_parameter'] = test_case_parameter
+ info_parameter['ref_code_path'] = self.ref_code_path
return json.dumps(info_parameter)
@@ -451,6 +452,3 @@ class TestCase(models.Model):
# Test case Expected answer in list form
expected_answer = models.TextField(blank=True, null = True)
-
- # # Test case path to system test code applicable for CPP, C, Java and Scilab
- # ref_code_path = models.TextField(blank=True, null = True)
diff --git a/testapp/exam/xmlrpc_clients.py b/testapp/exam/xmlrpc_clients.py
index 2b0f0fa..5791dc6 100644
--- a/testapp/exam/xmlrpc_clients.py
+++ b/testapp/exam/xmlrpc_clients.py
@@ -21,13 +21,6 @@ class CodeServerProxy(object):
def __init__(self):
pool_url = 'http://localhost:%d' % (SERVER_POOL_PORT)
self.pool_server = ServerProxy(pool_url)
- self.methods = {"python": 'run_python_code',
- "bash": 'run_bash_code',
- "C": "run_c_code",
- "C++": "run_cplus_code",
- "java": "run_java_code",
- "scilab": "run_scilab_code",
- }
def run_code(self, info_parameter, user_dir):
"""Tests given code (`answer`) with the `test_code` supplied. If the