summaryrefslogtreecommitdiff
path: root/testapp
diff options
context:
space:
mode:
authorankitjavalkar2015-03-05 12:27:02 +0530
committerankitjavalkar2015-04-26 19:43:19 +0530
commit9440bff5ae69c1d27f5c9622ca15cb8c603c6174 (patch)
tree761e4c37f02244232ce876233e026f1c0e894ad6 /testapp
parent8e2469e937fd4f80ebf2053d6e21c9b670d38ea2 (diff)
downloadonline_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.tar.gz
online_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.tar.bz2
online_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.zip
Code Server code cleanup and code commonification
- Pass question and test case info as json string (info_parameter) - Return success status and error message as a json string - Embed user answer and question lang in info_parameter - Commonify Python code evaluations and assertion test - Deprecate individual function call based on language
Diffstat (limited to 'testapp')
-rwxr-xr-xtestapp/exam/code_server.py1521
-rw-r--r--testapp/exam/models.py14
-rw-r--r--testapp/exam/templates/exam/add_question.html14
-rw-r--r--testapp/exam/views.py21
-rw-r--r--testapp/exam/xmlrpc_clients.py22
5 files changed, 904 insertions, 688 deletions
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py
index c34971f..fa5d916 100755
--- a/testapp/exam/code_server.py
+++ b/testapp/exam/code_server.py
@@ -29,6 +29,7 @@ import signal
from multiprocessing import Process, Queue
import subprocess
import re
+import json
# Local imports.
from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT
@@ -53,6 +54,131 @@ def timeout_handler(signum, frame):
"""A handler for the ALARM signal."""
raise TimeoutException('Code took too long to run.')
+def create_delete_signal_handler(action=None, old_handler=None):
+ if action == "new": # Add a new signal handler for the execution of this code.
+ prev_handler = signal.signal(signal.SIGALRM, timeout_handler)
+ signal.alarm(SERVER_TIMEOUT)
+ return prev_handler
+ elif action == "original": # Set back any original signal handler.
+ if old_handler is not None:
+ signal.signal(signal.SIGALRM, old_handler)
+ return
+ else:
+ raise Exception("Signal Handler: object cannot be NoneType")
+ elif action == "delete": # Cancel the signal if any, see signal.alarm documentation.
+ signal.alarm(0)
+ return
+ else:
+ raise Exception("Signal Handler: action not specified")
+
+
+###############################################################################
+# `TestCode` class.
+###############################################################################
+class TestCode(object):
+ """Evaluates and tests the code obtained from Code Server"""
+ def __init__(self, info_parameter, in_dir=None):
+ info_parameter = json.loads(info_parameter)
+ self.test_parameter = info_parameter.get("test_parameter")
+ self.language = info_parameter.get("language")
+ self.user_answer = info_parameter.get("user_answer")
+ self.in_dir = in_dir
+
+ def run_code(self):
+ self._change_dir(self.in_dir)
+
+ # Create test cases
+ test_code = self.create_test_case()
+ # Evaluate, run code and obtain result
+ result = self.evaluate_code(test_code)
+
+ return result
+
+ def evaluate_code(self, test_code):
+ success = False
+ prev_handler = None
+
+ if self.language == "python":
+ tb = None
+ prev_handler = create_delete_signal_handler("new")
+
+ try:
+ submitted = compile(self.user_answer, '<string>', mode='exec')
+ g = {}
+ exec submitted in g
+ _tests = compile(test_code, '<string>', mode='exec')
+ exec _tests in g
+ except TimeoutException:
+ err = self.timeout_msg
+ except AssertionError:
+ type, value, tb = sys.exc_info()
+ info = traceback.extract_tb(tb)
+ fname, lineno, func, text = info[-1]
+ text = str(test_code).splitlines()[lineno-1]
+ err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ else:
+ success = True
+ err = 'Correct answer'
+ finally:
+ del tb
+ # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ # Cancel the signal
+ create_delete_signal_handler("delete")
+
+ if self.language == "c++":
+ pass
+
+ if self.language == "c":
+ pass
+
+ if self.language == "java":
+ pass
+
+ if self.language == "scilab":
+ pass
+
+ result = {'success': success, 'error': err}
+ return result
+
+ def compile_code(self):
+ pass
+
+ def create_test_case(self):
+ # Create assert based test cases in python
+ if self.language == "python":
+ test_code = ""
+ for test_case in self.test_parameter:
+ pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
+ else ""
+ kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
+ if test_case.get('kw_args') else ""
+ args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
+ tcode = "assert {0}({1}) == {2}" \
+ .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
+ test_code += tcode + "\n"
+ return test_code
+
+ if self.language == "c++":
+ pass
+
+ if self.language == "c":
+ pass
+
+ if self.language == "java":
+ pass
+
+ if self.language == "scilab":
+ pass
+
+ def _change_dir(self, in_dir):
+ if in_dir is not None and isdir(in_dir):
+ os.chdir(in_dir)
+
###############################################################################
# `CodeServer` class.
@@ -68,6 +194,16 @@ class CodeServer(object):
'have an infinite loop in your code.' % SERVER_TIMEOUT
self.timeout_msg = msg
+ def checker(self, info_parameter, in_dir=None):
+ """Calls the TestCode Class to test the current code"""
+ tc = TestCode(info_parameter, in_dir)
+ result = tc.run_code()
+
+ # Put us back into the server pool queue since we are free now.
+ self.queue.put(self.port)
+
+ return json.dumps(result)
+
def run_python_code(self, answer, test_parameter, in_dir=None):
"""Tests given Python function (`answer`) with the `test_code`
supplied. If the optional `in_dir` keyword argument is supplied
@@ -88,6 +224,7 @@ class CodeServer(object):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(SERVER_TIMEOUT)
+ test_parameter = json.loads(test_parameter)
success = False
tb = None
@@ -135,659 +272,741 @@ class CodeServer(object):
result = {'success': success, 'error': err}
return result
- def run_bash_code(self, answer, test_parameter, in_dir=None):
- """Tests given Bash code (`answer`) with the `test_code` supplied.
-
- The testcode should typically contain two lines, the first is a path to
- the reference script we are to compare against. The second is a path
- to the arguments to be supplied to the reference and submitted script.
- The output of these will be compared for correctness.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- def _set_exec(fname):
- os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
- | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
- | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
- submit_f = open('submit.sh', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
- _set_exec(submit_path)
-
- # ref path and path to arguments is a comma seperated string obtained
- #from ref_code_path field in TestCase Mode
- path_list = test_parameter.get('ref_code_path').split(',')
- ref_path = path_list[0].strip() if path_list[0] else ""
- test_case_path = path_list[1].strip() if path_list[1] else ""
-
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
- if not test_case_path.startswith('/'):
- test_case_path = join(MY_DIR, test_case_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self.check_bash_script(ref_path, submit_path,
- test_case_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _run_command(self, cmd_args, *args, **kw):
- """Run a command in a subprocess while blocking, the process is killed
- if it takes more than 2 seconds to run. Return the Popen object, the
- stdout and stderr.
- """
- try:
- proc = subprocess.Popen(cmd_args, *args, **kw)
- stdout, stderr = proc.communicate()
- except TimeoutException:
- # Runaway code, so kill it.
- proc.kill()
- # Re-raise exception.
- raise
- return proc, stdout, stderr
-
- def check_bash_script(self, ref_script_path, submit_script_path,
- test_case_path=None):
- """ Function validates student script using instructor script as
- reference. Test cases can optionally be provided. The first argument
- ref_script_path, is the path to instructor script, it is assumed to
- have executable permission. The second argument submit_script_path, is
- the path to the student script, it is assumed to have executable
- permission. The Third optional argument is the path to test the
- scripts. Each line in this file is a test case and each test case is
- passed to the script as standard arguments.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student script passes all
- test cases/have same output, when compared to the instructor script
-
- returns (False, error_msg): If the student script fails a single
- test/have dissimilar output, when compared to the instructor script.
-
- Returns (False, error_msg): If mandatory arguments are not files or if
- the required permissions are not given to the file(s).
-
- """
- if not ref_script_path:
- return False, "No Test Script path found"
- if not isfile(ref_script_path):
- return False, "No file at %s" % ref_script_path
- if not isfile(submit_script_path):
- return False, 'No file at %s' % submit_script_path
- if not os.access(ref_script_path, os.X_OK):
- return False, 'Script %s is not executable' % ref_script_path
- if not os.access(submit_script_path, os.X_OK):
- return False, 'Script %s is not executable' % submit_script_path
-
- if test_case_path is None or "":
- ret = self._run_command(ref_script_path, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, inst_stdout, inst_stderr = ret
- ret = self._run_command(submit_script_path, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdnt_stdout, stdnt_stderr = ret
- if inst_stdout == stdnt_stdout:
- return True, 'Correct answer'
- else:
- err = "Error: expected %s, got %s" % (inst_stderr,
- stdnt_stderr)
- return False, err
- else:
- if not isfile(test_case_path):
- return False, "No test case at %s" % test_case_path
- if not os.access(ref_script_path, os.R_OK):
- return False, "Test script %s, not readable" % test_case_path
- valid_answer = True # We initially make it one, so that we can
- # stop once a test case fails
- loop_count = 0 # Loop count has to be greater than or
- # equal to one.
- # Useful for caching things like empty
- # test files,etc.
- test_cases = open(test_case_path).readlines()
- num_lines = len(test_cases)
- for test_case in test_cases:
- loop_count += 1
- if valid_answer:
- args = [ref_script_path] + [x for x in test_case.split()]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, inst_stdout, inst_stderr = ret
- args = [submit_script_path]+[x for x in test_case.split()]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdnt_stdout, stdnt_stderr = ret
- valid_answer = inst_stdout == stdnt_stdout
- if valid_answer and (num_lines == loop_count):
- return True, "Correct answer"
- else:
- err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
- stdnt_stdout+stdnt_stderr)
- return False, err
-
- def run_c_code(self, answer, test_parameter, in_dir=None):
- """Tests given C code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # File extension must be .c
- submit_f = open('submit.c', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_c_cpp_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _compile_command(self, cmd, *args, **kw):
- """Compiles C/C++/java code and returns errors if any.
- Run a command in a subprocess while blocking, the process is killed
- if it takes more than 2 seconds to run. Return the Popen object, the
- stderr.
- """
- try:
- proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = proc_compile.communicate()
- except TimeoutException:
- # Runaway code, so kill it.
- proc_compile.kill()
- # Re-raise exception.
- raise
- return proc_compile, err
-
- def _check_c_cpp_code(self, ref_code_path, submit_code_path):
- """ Function validates student code using instructor code as
- reference.The first argument ref_code_path, is the path to
- instructor code, it is assumed to have executable permission.
- The second argument submit_code_path, is the path to the student
- code, it is assumed to have executable permission.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student function returns
- expected output when called by reference code.
-
- returns (False, error_msg): If the student function fails to return
- expected output when called by reference code.
-
- Returns (False, error_msg): If mandatory arguments are not files or
- if the required permissions are not given to the file(s).
-
- """
- if not isfile(ref_code_path):
- return False, "No file at %s" % ref_code_path
- if not isfile(submit_code_path):
- return False, 'No file at %s' % submit_code_path
-
- success = False
- output_path = os.getcwd() + '/output'
- compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
- ret = self._compile_command(compile_command)
- proc, stdnt_stderr = ret
-
- # Only if compilation is successful, the program is executed
- # And tested with testcases
- if stdnt_stderr == '':
- executable = os.getcwd() + '/executable'
- compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
- executable)
- ret = self._compile_command(compile_main)
- proc, main_err = ret
- if main_err == '':
- args = [executable]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
- if proc.returncode == 0:
- success, err = True, "Correct answer"
- else:
- err = stdout + "\n" + stderr
- os.remove(executable)
- else:
- err = "Error:"
- try:
- error_lines = main_err.splitlines()
- for e in error_lines:
- err = err + "\n" + e.split(":", 1)[1]
- except:
- err = err + "\n" + main_err
- os.remove(output_path)
- else:
- err = "Compilation Error:"
- try:
- error_lines = stdnt_stderr.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + stdnt_stderr
- return success, err
-
- def run_cplus_code(self, answer, test_parameter, in_dir=None):
- """Tests given C++ code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # The file extension must be .cpp
- submit_f = open('submitstd.cpp', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_c_cpp_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def run_java_code(self, answer, test_parameter, in_dir=None):
- """Tests given java code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # The file extension must be .java
- # The class name and file name must be same in java
- submit_f = open('Test.java', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_java_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- return success, err
-
- def _check_java_code(self, ref_code_path, submit_code_path):
- """ Function validates student code using instructor code as
- reference.The first argument ref_code_path, is the path to
- instructor code, it is assumed to have executable permission.
- The second argument submit_code_path, is the path to the student
- code, it is assumed to have executable permission.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student function returns
- expected output when called by reference code.
-
- returns (False, error_msg): If the student function fails to return
- expected output when called by reference code.
-
- Returns (False, error_msg): If mandatory arguments are not files or
- if the required permissions are not given to the file(s).
-
- """
- if not isfile(ref_code_path):
- return False, "No file at %s" % ref_code_path
- if not isfile(submit_code_path):
- return False, 'No file at %s' % submit_code_path
-
- success = False
- compile_command = "javac %s" % (submit_code_path)
- ret = self._compile_command(compile_command)
- proc, stdnt_stderr = ret
- stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
-
- # Only if compilation is successful, the program is executed
- # And tested with testcases
- if stdnt_stderr == '':
- student_directory = os.getcwd() + '/'
- student_file_name = "Test"
- compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
- student_directory,
- student_directory)
- ret = self._compile_command(compile_main)
- proc, main_err = ret
- main_err = self._remove_null_substitute_char(main_err)
-
- if main_err == '':
- main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
- run_command = "java -cp %s %s" % (student_directory,
- main_file_name)
- ret = self._run_command(run_command,
- stdin=None,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
- if proc.returncode == 0:
- success, err = True, "Correct answer"
- else:
- err = stdout + "\n" + stderr
- success = False
- os.remove("%s%s.class" % (student_directory, main_file_name))
- else:
- err = "Error:\n"
- try:
- error_lines = main_err.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + main_err
- os.remove("%s%s.class" % (student_directory, student_file_name))
- else:
- err = "Compilation Error:\n"
- try:
- error_lines = stdnt_stderr.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + stdnt_stderr
- result = {'success': success, 'error': err}
- return result
-
- def _remove_null_substitute_char(self, string):
- """Returns a string without any null and substitute characters"""
- stripped = ""
- for c in string:
- if ord(c) is not 26 and ord(c) is not 0:
- stripped = stripped + c
- return ''.join(stripped)
+# ###############################################################################
+# # `CodeServer` class.
+# ###############################################################################
+# class CodeServer(object):
+# """A code server that executes user submitted test code, tests it and
+# reports if the code was correct or not.
+# """
+# def __init__(self, port, queue):
+# self.port = port
+# self.queue = queue
+# msg = 'Code took more than %s seconds to run. You probably '\
+# 'have an infinite loop in your code.' % SERVER_TIMEOUT
+# self.timeout_msg = msg
+
+# def run_python_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Python function (`answer`) with the `test_code`
+# supplied. If the optional `in_dir` keyword argument is supplied
+# it changes the directory to that directory (it does not change
+# it back to the original when done). This function also timesout
+# when the function takes more than SERVER_TIMEOUT seconds to run
+# to prevent runaway code.
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# test_parameter = json.loads(test_parameter)
+# success = False
+# tb = None
+
+# test_code = ""
+# for test_case in test_parameter:
+# pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
+# else ""
+# kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
+# if test_case.get('kw_args') else ""
+# args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
+# tcode = "assert {0}({1}) == {2}" \
+# .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
+# test_code += tcode + "\n"
+# try:
+# submitted = compile(answer, '<string>', mode='exec')
+# g = {}
+# exec submitted in g
+# _tests = compile(test_code, '<string>', mode='exec')
+# exec _tests in g
+# except TimeoutException:
+# err = self.timeout_msg
+# except AssertionError:
+# type, value, tb = sys.exc_info()
+# info = traceback.extract_tb(tb)
+# fname, lineno, func, text = info[-1]
+# text = str(test_code).splitlines()[lineno-1]
+# err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# else:
+# success = True
+# err = 'Correct answer'
+# finally:
+# del tb
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def run_bash_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Bash code (`answer`) with the `test_code` supplied.
+
+# The testcode should typically contain two lines, the first is a path to
+# the reference script we are to compare against. The second is a path
+# to the arguments to be supplied to the reference and submitted script.
+# The output of these will be compared for correctness.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# def _set_exec(fname):
+# os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
+# | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
+# | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
+# submit_f = open('submit.sh', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+# _set_exec(submit_path)
+
+# # ref path and path to arguments is a comma seperated string obtained
+# #from ref_code_path field in TestCase Mode
+# path_list = test_parameter.get('ref_code_path').split(',')
+# ref_path = path_list[0].strip() if path_list[0] else ""
+# test_case_path = path_list[1].strip() if path_list[1] else ""
+
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+# if not test_case_path.startswith('/'):
+# test_case_path = join(MY_DIR, test_case_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self.check_bash_script(ref_path, submit_path,
+# test_case_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _run_command(self, cmd_args, *args, **kw):
+# """Run a command in a subprocess while blocking, the process is killed
+# if it takes more than 2 seconds to run. Return the Popen object, the
+# stdout and stderr.
+# """
+# try:
+# proc = subprocess.Popen(cmd_args, *args, **kw)
+# stdout, stderr = proc.communicate()
+# except TimeoutException:
+# # Runaway code, so kill it.
+# proc.kill()
+# # Re-raise exception.
+# raise
+# return proc, stdout, stderr
+
+# def check_bash_script(self, ref_script_path, submit_script_path,
+# test_case_path=None):
+# """ Function validates student script using instructor script as
+# reference. Test cases can optionally be provided. The first argument
+# ref_script_path, is the path to instructor script, it is assumed to
+# have executable permission. The second argument submit_script_path, is
+# the path to the student script, it is assumed to have executable
+# permission. The Third optional argument is the path to test the
+# scripts. Each line in this file is a test case and each test case is
+# passed to the script as standard arguments.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student script passes all
+# test cases/have same output, when compared to the instructor script
+
+# returns (False, error_msg): If the student script fails a single
+# test/have dissimilar output, when compared to the instructor script.
+
+# Returns (False, error_msg): If mandatory arguments are not files or if
+# the required permissions are not given to the file(s).
+
+# """
+# if not ref_script_path:
+# return False, "No Test Script path found"
+# if not isfile(ref_script_path):
+# return False, "No file at %s" % ref_script_path
+# if not isfile(submit_script_path):
+# return False, 'No file at %s' % submit_script_path
+# if not os.access(ref_script_path, os.X_OK):
+# return False, 'Script %s is not executable' % ref_script_path
+# if not os.access(submit_script_path, os.X_OK):
+# return False, 'Script %s is not executable' % submit_script_path
+
+# if test_case_path is None or "":
+# ret = self._run_command(ref_script_path, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, inst_stdout, inst_stderr = ret
+# ret = self._run_command(submit_script_path, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdnt_stdout, stdnt_stderr = ret
+# if inst_stdout == stdnt_stdout:
+# return True, 'Correct answer'
+# else:
+# err = "Error: expected %s, got %s" % (inst_stderr,
+# stdnt_stderr)
+# return False, err
+# else:
+# if not isfile(test_case_path):
+# return False, "No test case at %s" % test_case_path
+# if not os.access(ref_script_path, os.R_OK):
+# return False, "Test script %s, not readable" % test_case_path
+# valid_answer = True # We initially make it one, so that we can
+# # stop once a test case fails
+# loop_count = 0 # Loop count has to be greater than or
+# # equal to one.
+# # Useful for caching things like empty
+# # test files,etc.
+# test_cases = open(test_case_path).readlines()
+# num_lines = len(test_cases)
+# for test_case in test_cases:
+# loop_count += 1
+# if valid_answer:
+# args = [ref_script_path] + [x for x in test_case.split()]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, inst_stdout, inst_stderr = ret
+# args = [submit_script_path]+[x for x in test_case.split()]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdnt_stdout, stdnt_stderr = ret
+# valid_answer = inst_stdout == stdnt_stdout
+# if valid_answer and (num_lines == loop_count):
+# return True, "Correct answer"
+# else:
+# err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
+# stdnt_stdout+stdnt_stderr)
+# return False, err
+
+# def run_c_code(self, answer, test_parameter, in_dir=None):
+# """Tests given C code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # File extension must be .c
+# submit_f = open('submit.c', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_c_cpp_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _compile_command(self, cmd, *args, **kw):
+# """Compiles C/C++/java code and returns errors if any.
+# Run a command in a subprocess while blocking, the process is killed
+# if it takes more than 2 seconds to run. Return the Popen object, the
+# stderr.
+# """
+# try:
+# proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# out, err = proc_compile.communicate()
+# except TimeoutException:
+# # Runaway code, so kill it.
+# proc_compile.kill()
+# # Re-raise exception.
+# raise
+# return proc_compile, err
+
+# def _check_c_cpp_code(self, ref_code_path, submit_code_path):
+# """ Function validates student code using instructor code as
+# reference.The first argument ref_code_path, is the path to
+# instructor code, it is assumed to have executable permission.
+# The second argument submit_code_path, is the path to the student
+# code, it is assumed to have executable permission.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student function returns
+# expected output when called by reference code.
+
+# returns (False, error_msg): If the student function fails to return
+# expected output when called by reference code.
+
+# Returns (False, error_msg): If mandatory arguments are not files or
+# if the required permissions are not given to the file(s).
+
+# """
+# if not isfile(ref_code_path):
+# return False, "No file at %s" % ref_code_path
+# if not isfile(submit_code_path):
+# return False, 'No file at %s' % submit_code_path
+
+# success = False
+# output_path = os.getcwd() + '/output'
+# compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
+# ret = self._compile_command(compile_command)
+# proc, stdnt_stderr = ret
+
+# # Only if compilation is successful, the program is executed
+# # And tested with testcases
+# if stdnt_stderr == '':
+# executable = os.getcwd() + '/executable'
+# compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
+# executable)
+# ret = self._compile_command(compile_main)
+# proc, main_err = ret
+# if main_err == '':
+# args = [executable]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+# if proc.returncode == 0:
+# success, err = True, "Correct answer"
+# else:
+# err = stdout + "\n" + stderr
+# os.remove(executable)
+# else:
+# err = "Error:"
+# try:
+# error_lines = main_err.splitlines()
+# for e in error_lines:
+# err = err + "\n" + e.split(":", 1)[1]
+# except:
+# err = err + "\n" + main_err
+# os.remove(output_path)
+# else:
+# err = "Compilation Error:"
+# try:
+# error_lines = stdnt_stderr.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + stdnt_stderr
+# return success, err
+
+# def run_cplus_code(self, answer, test_parameter, in_dir=None):
+# """Tests given C++ code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # The file extension must be .cpp
+# submit_f = open('submitstd.cpp', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_c_cpp_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def run_java_code(self, answer, test_parameter, in_dir=None):
+# """Tests given java code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # The file extension must be .java
+# # The class name and file name must be same in java
+# submit_f = open('Test.java', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_java_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# return success, err
+
+# def _check_java_code(self, ref_code_path, submit_code_path):
+# """ Function validates student code using instructor code as
+# reference.The first argument ref_code_path, is the path to
+# instructor code, it is assumed to have executable permission.
+# The second argument submit_code_path, is the path to the student
+# code, it is assumed to have executable permission.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student function returns
+# expected output when called by reference code.
+
+# returns (False, error_msg): If the student function fails to return
+# expected output when called by reference code.
+
+# Returns (False, error_msg): If mandatory arguments are not files or
+# if the required permissions are not given to the file(s).
+
+# """
+# if not isfile(ref_code_path):
+# return False, "No file at %s" % ref_code_path
+# if not isfile(submit_code_path):
+# return False, 'No file at %s' % submit_code_path
+
+# success = False
+# compile_command = "javac %s" % (submit_code_path)
+# ret = self._compile_command(compile_command)
+# proc, stdnt_stderr = ret
+# stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
+
+# # Only if compilation is successful, the program is executed
+# # And tested with testcases
+# if stdnt_stderr == '':
+# student_directory = os.getcwd() + '/'
+# student_file_name = "Test"
+# compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
+# student_directory,
+# student_directory)
+# ret = self._compile_command(compile_main)
+# proc, main_err = ret
+# main_err = self._remove_null_substitute_char(main_err)
+
+# if main_err == '':
+# main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
+# run_command = "java -cp %s %s" % (student_directory,
+# main_file_name)
+# ret = self._run_command(run_command,
+# stdin=None,
+# shell=True,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+# if proc.returncode == 0:
+# success, err = True, "Correct answer"
+# else:
+# err = stdout + "\n" + stderr
+# success = False
+# os.remove("%s%s.class" % (student_directory, main_file_name))
+# else:
+# err = "Error:\n"
+# try:
+# error_lines = main_err.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + main_err
+# os.remove("%s%s.class" % (student_directory, student_file_name))
+# else:
+# err = "Compilation Error:\n"
+# try:
+# error_lines = stdnt_stderr.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + stdnt_stderr
+# result = {'success': success, 'error': err}
+# return result
+
+# def _remove_null_substitute_char(self, string):
+# """Returns a string without any null and substitute characters"""
+# stripped = ""
+# for c in string:
+# if ord(c) is not 26 and ord(c) is not 0:
+# stripped = stripped + c
+# return ''.join(stripped)
- def run_scilab_code(self, answer, test_parameter, in_dir=None):
- """Tests given Scilab function (`answer`) with the `test_code`
- supplied. If the optional `in_dir` keyword argument is supplied
- it changes the directory to that directory (it does not change
- it back to the original when done). This function also timesout
- when the function takes more than SERVER_TIMEOUT seconds to run
- to prevent runaway code.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
+# def run_scilab_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Scilab function (`answer`) with the `test_code`
+# supplied. If the optional `in_dir` keyword argument is supplied
+# it changes the directory to that directory (it does not change
+# it back to the original when done). This function also timesout
+# when the function takes more than SERVER_TIMEOUT seconds to run
+# to prevent runaway code.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
- # Removes all the commands that terminates scilab
- answer,i = self._remove_scilab_exit(answer.lstrip())
-
- # Throw message if there are commmands that terminates scilab
- add_err=""
- if i > 0:
- add_err = "Please do not use exit, quit and abort commands in your\
- code.\n Otherwise your code will not be evaluated\
- correctly.\n"
-
- # The file extension should be .sci
- submit_f = open('function.sci','w')
- submit_f.write(answer)
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
- cmd += ' | timeout 8 scilab-cli -nb'
- ret = self._run_command(cmd,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
-
- # Get only the error.
- stderr = self._get_error(stdout)
- if stderr is None:
- # Clean output
- stdout = self._strip_output(stdout)
- if proc.returncode == 5:
- success, err = True, "Correct answer"
- else:
- err = add_err + stdout
- else:
- err = add_err + stderr
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _remove_scilab_exit(self, string):
- """
- Removes exit, quit and abort from the scilab code
- """
- new_string = ""
- i=0
- for line in string.splitlines():
- new_line = re.sub(r"exit.*$","",line)
- new_line = re.sub(r"quit.*$","",new_line)
- new_line = re.sub(r"abort.*$","",new_line)
- if line != new_line:
- i=i+1
- new_string = new_string +'\n'+ new_line
- return new_string, i
+# # Removes all the commands that terminates scilab
+# answer,i = self._remove_scilab_exit(answer.lstrip())
+
+# # Throw message if there are commmands that terminates scilab
+# add_err=""
+# if i > 0:
+# add_err = "Please do not use exit, quit and abort commands in your\
+# code.\n Otherwise your code will not be evaluated\
+# correctly.\n"
+
+# # The file extension should be .sci
+# submit_f = open('function.sci','w')
+# submit_f.write(answer)
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
+# cmd += ' | timeout 8 scilab-cli -nb'
+# ret = self._run_command(cmd,
+# shell=True,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+
+# # Get only the error.
+# stderr = self._get_error(stdout)
+# if stderr is None:
+# # Clean output
+# stdout = self._strip_output(stdout)
+# if proc.returncode == 5:
+# success, err = True, "Correct answer"
+# else:
+# err = add_err + stdout
+# else:
+# err = add_err + stderr
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _remove_scilab_exit(self, string):
+# """
+# Removes exit, quit and abort from the scilab code
+# """
+# new_string = ""
+# i=0
+# for line in string.splitlines():
+# new_line = re.sub(r"exit.*$","",line)
+# new_line = re.sub(r"quit.*$","",new_line)
+# new_line = re.sub(r"abort.*$","",new_line)
+# if line != new_line:
+# i=i+1
+# new_string = new_string +'\n'+ new_line
+# return new_string, i
def _get_error(self, string):
"""
diff --git a/testapp/exam/models.py b/testapp/exam/models.py
index ea79ad2..dd0c806 100644
--- a/testapp/exam/models.py
+++ b/testapp/exam/models.py
@@ -1,4 +1,5 @@
import datetime
+import json
from random import sample, shuffle
from django.db import models
from django.contrib.auth.models import User
@@ -82,8 +83,9 @@ class Question(models.Model):
# Tags for the Question.
tags = TaggableManager()
- def consolidate_test_cases(self, test):
- test_case_parameter= []
+ def consolidate_answer_data(self, test, user_answer):
+ test_case_parameter = []
+ info_parameter = {}
for i in test:
kw_args_dict = {}
@@ -94,7 +96,6 @@ class Question(models.Model):
parameter_dict['func_name'] = i.func_name
parameter_dict['expected_answer'] = i.expected_answer
parameter_dict['ref_code_path'] = i.ref_code_path
- parameter_dict['language'] = self.language
if i.kw_args:
for args in i.kw_args.split(","):
@@ -109,7 +110,12 @@ class Question(models.Model):
parameter_dict['pos_args'] = pos_args_list
test_case_parameter.append(parameter_dict)
- return test_case_parameter
+ info_parameter['language'] = self.language
+ info_parameter['id'] = self.id
+ info_parameter['user_answer'] = user_answer
+ info_parameter['test_parameter'] = test_case_parameter
+
+ return json.dumps(info_parameter)
def __unicode__(self):
return self.summary
diff --git a/testapp/exam/templates/exam/add_question.html b/testapp/exam/templates/exam/add_question.html
index d989b81..e5e2574 100644
--- a/testapp/exam/templates/exam/add_question.html
+++ b/testapp/exam/templates/exam/add_question.html
@@ -40,20 +40,6 @@
{% endfor %}
{% endif %}
</form>
-<!-- <form method="post" action="">
- <center><table class=span1>
- {{ formset.management_form }}
- {% for form in formset %}
- {{ form.id }}
- <tr><td>Question <td>{{ form.question }}
- <tr><td>Function Name <td>{{ form.func_name }}
- <tr><td>Keyword argument <td>{{ form.kw_args }}
- <tr><td>Positional Argument <td>{{ form.pos_args }}
- <tr><td>Expected Answer <td>{{ form.expected_answer }}
- <tr><td>Code Path <td>{{ form.ref_code_path }}
- {% endfor %}
- </form> -->
- <!-- end -->
</table></center>
<center><button class="btn" type="submit" name="add_test">Add Test Case</button> <!-- -->
<button class="btn" type="submit" name="delete_test">Remove Test Case</button> <!-- -->
diff --git a/testapp/exam/views.py b/testapp/exam/views.py
index 3d22d55..8aef5ab 100644
--- a/testapp/exam/views.py
+++ b/testapp/exam/views.py
@@ -14,6 +14,7 @@ from django.db.models import Sum
from django.views.decorators.csrf import csrf_exempt
from taggit.models import Tag
from itertools import chain
+import json
# Local imports.
from testapp.exam.models import Quiz, Question, QuestionPaper, QuestionSet
from testapp.exam.models import Profile, Answer, AnswerPaper, User, TestCase
@@ -909,7 +910,6 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
q_paper = QuestionPaper.objects.get(id=questionpaper_id)
paper = AnswerPaper.objects.get(user=request.user, question_paper=q_paper)
test = TestCase.objects.filter(question=question)
- test_parameter = question.consolidate_test_cases(test)
snippet_code = request.POST.get('snippet')
user_code = request.POST.get('answer')
@@ -940,7 +940,7 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
user_answer = 'ASSIGNMENT UPLOADED'
else:
user_code = request.POST.get('answer')
- user_answer = user_code #snippet_code + "\n" + user_code
+ user_answer = snippet_code + "\n" + user_code if snippet_code else user_code
new_answer = Answer(question=question, answer=user_answer,
correct=False)
@@ -951,14 +951,16 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
# questions, we obtain the results via XML-RPC with the code executed
# safely in a separate process (the code_server.py) running as nobody.
if not question.type == 'upload':
- correct, result = validate_answer(user, user_answer, question, test_parameter)
+ if question.type == 'code':
+ info_parameter = question.consolidate_answer_data(test, user_answer)
+ correct, result = validate_answer(user, user_answer, question, info_parameter)
if correct:
new_answer.correct = correct
new_answer.marks = question.points
- new_answer.error = err_msg
+ new_answer.error = result.get('error')
success_msg = True
else:
- new_answer.error = err_msg
+ new_answer.error = result.get('error')
new_answer.save()
time_left = paper.time_left()
@@ -996,7 +998,7 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
questionpaper_id, success_msg)
-def validate_answer(user, user_answer, question, test_parameter):
+def validate_answer(user, user_answer, question, info_parameter=None):
"""
Checks whether the answer submitted by the user is right or wrong.
If right then returns correct = True, success and
@@ -1011,17 +1013,18 @@ def validate_answer(user, user_answer, question, test_parameter):
if user_answer is not None:
if question.type == 'mcq':
- if user_answer.strip() == question.test.strip():
+ if user_answer.strip() == question.test.strip(): ####add question.answer/question.solution instead of test
correct = True
message = 'Correct answer'
elif question.type == 'mcc':
- answers = set(question.test.splitlines())
+ answers = set(question.test.splitlines()) ####add question.answer/question.solution instead of test
if set(user_answer) == answers:
correct = True
message = 'Correct answer'
elif question.type == 'code':
user_dir = get_user_dir(user)
- result = code_server.run_code(user_answer, test_parameter, user_dir, question.language)
+ json_result = code_server.run_code(info_parameter, user_dir)
+ result = json.loads(json_result)
if result.get('success'):
correct = True
diff --git a/testapp/exam/xmlrpc_clients.py b/testapp/exam/xmlrpc_clients.py
index 692fbb5..bbd6110 100644
--- a/testapp/exam/xmlrpc_clients.py
+++ b/testapp/exam/xmlrpc_clients.py
@@ -29,7 +29,7 @@ class CodeServerProxy(object):
"scilab": "run_scilab_code",
}
- def run_code(self, answer, test_parameter, user_dir, language):
+ def run_code(self, info_parameter, user_dir):
"""Tests given code (`answer`) with the `test_code` supplied. If the
optional `in_dir` keyword argument is supplied it changes the directory
to that directory (it does not change it back to the original when
@@ -38,26 +38,28 @@ class CodeServerProxy(object):
Parameters
----------
- answer : str
- The user's answer for the question.
+ info_parameter contains;
+ user_answer : str
+ The user's answer for the question.
test_code : str
The test code to check the user code with.
- user_dir : str (directory)
- The directory to run the tests inside.
language : str
The programming language to use.
+ user_dir : str (directory)
+ The directory to run the tests inside.
+
+
Returns
-------
- A tuple: (success, error message).
+ A json string of a dict: {success: success, err: error message}.
"""
- method_name = self.methods[language]
+ # method_name = self.methods[language]
try:
server = self._get_server()
- method = getattr(server, method_name)
- result = method(answer, test_parameter, user_dir) ####
+ result = server.checker(info_parameter, user_dir)
except ConnectionError:
- result = {'success': False, 'error': 'Unable to connect to any code servers!'}
+ result = json.dumps({'success': False, 'error': 'Unable to connect to any code servers!'})
return result
def _get_server(self):