summaryrefslogtreecommitdiff
path: root/testapp/exam/code_server.py
diff options
context:
space:
mode:
authorankitjavalkar2015-03-05 12:27:02 +0530
committerankitjavalkar2015-04-26 19:43:19 +0530
commit9440bff5ae69c1d27f5c9622ca15cb8c603c6174 (patch)
tree761e4c37f02244232ce876233e026f1c0e894ad6 /testapp/exam/code_server.py
parent8e2469e937fd4f80ebf2053d6e21c9b670d38ea2 (diff)
downloadonline_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.tar.gz
online_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.tar.bz2
online_test-9440bff5ae69c1d27f5c9622ca15cb8c603c6174.zip
Code Server code cleanup and code commonification
- Pass question and test case info as json string (info_parameter) - Return success status and error message as a json string - Embed user answer and question lang in info_parameter - Commonify Python code evaluations and assertion test - Deprecate individual function call based on language
Diffstat (limited to 'testapp/exam/code_server.py')
-rwxr-xr-xtestapp/exam/code_server.py1521
1 files changed, 870 insertions, 651 deletions
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py
index c34971f..fa5d916 100755
--- a/testapp/exam/code_server.py
+++ b/testapp/exam/code_server.py
@@ -29,6 +29,7 @@ import signal
from multiprocessing import Process, Queue
import subprocess
import re
+import json
# Local imports.
from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT
@@ -53,6 +54,131 @@ def timeout_handler(signum, frame):
"""A handler for the ALARM signal."""
raise TimeoutException('Code took too long to run.')
+def create_delete_signal_handler(action=None, old_handler=None):
+ if action == "new": # Add a new signal handler for the execution of this code.
+ prev_handler = signal.signal(signal.SIGALRM, timeout_handler)
+ signal.alarm(SERVER_TIMEOUT)
+ return prev_handler
+ elif action == "original": # Set back any original signal handler.
+ if old_handler is not None:
+ signal.signal(signal.SIGALRM, old_handler)
+ return
+ else:
+ raise Exception("Signal Handler: object cannot be NoneType")
+ elif action == "delete": # Cancel the signal if any, see signal.alarm documentation.
+ signal.alarm(0)
+ return
+ else:
+ raise Exception("Signal Handler: action not specified")
+
+
+###############################################################################
+# `TestCode` class.
+###############################################################################
+class TestCode(object):
+ """Evaluates and tests the code obtained from Code Server"""
+ def __init__(self, info_parameter, in_dir=None):
+ info_parameter = json.loads(info_parameter)
+ self.test_parameter = info_parameter.get("test_parameter")
+ self.language = info_parameter.get("language")
+ self.user_answer = info_parameter.get("user_answer")
+ self.in_dir = in_dir
+
+ def run_code(self):
+ self._change_dir(self.in_dir)
+
+ # Create test cases
+ test_code = self.create_test_case()
+ # Evaluate, run code and obtain result
+ result = self.evaluate_code(test_code)
+
+ return result
+
+ def evaluate_code(self, test_code):
+ success = False
+ prev_handler = None
+
+ if self.language == "python":
+ tb = None
+ prev_handler = create_delete_signal_handler("new")
+
+ try:
+ submitted = compile(self.user_answer, '<string>', mode='exec')
+ g = {}
+ exec submitted in g
+ _tests = compile(test_code, '<string>', mode='exec')
+ exec _tests in g
+ except TimeoutException:
+ err = self.timeout_msg
+ except AssertionError:
+ type, value, tb = sys.exc_info()
+ info = traceback.extract_tb(tb)
+ fname, lineno, func, text = info[-1]
+ text = str(test_code).splitlines()[lineno-1]
+ err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ else:
+ success = True
+ err = 'Correct answer'
+ finally:
+ del tb
+ # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ # Cancel the signal
+ create_delete_signal_handler("delete")
+
+ if self.language == "c++":
+ pass
+
+ if self.language == "c":
+ pass
+
+ if self.language == "java":
+ pass
+
+ if self.language == "scilab":
+ pass
+
+ result = {'success': success, 'error': err}
+ return result
+
+ def compile_code(self):
+ pass
+
+ def create_test_case(self):
+ # Create assert based test cases in python
+ if self.language == "python":
+ test_code = ""
+ for test_case in self.test_parameter:
+ pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
+ else ""
+ kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
+ if test_case.get('kw_args') else ""
+ args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
+ tcode = "assert {0}({1}) == {2}" \
+ .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
+ test_code += tcode + "\n"
+ return test_code
+
+ if self.language == "c++":
+ pass
+
+ if self.language == "c":
+ pass
+
+ if self.language == "java":
+ pass
+
+ if self.language == "scilab":
+ pass
+
+ def _change_dir(self, in_dir):
+ if in_dir is not None and isdir(in_dir):
+ os.chdir(in_dir)
+
###############################################################################
# `CodeServer` class.
@@ -68,6 +194,16 @@ class CodeServer(object):
'have an infinite loop in your code.' % SERVER_TIMEOUT
self.timeout_msg = msg
+ def checker(self, info_parameter, in_dir=None):
+ """Calls the TestCode Class to test the current code"""
+ tc = TestCode(info_parameter, in_dir)
+ result = tc.run_code()
+
+ # Put us back into the server pool queue since we are free now.
+ self.queue.put(self.port)
+
+ return json.dumps(result)
+
def run_python_code(self, answer, test_parameter, in_dir=None):
"""Tests given Python function (`answer`) with the `test_code`
supplied. If the optional `in_dir` keyword argument is supplied
@@ -88,6 +224,7 @@ class CodeServer(object):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(SERVER_TIMEOUT)
+ test_parameter = json.loads(test_parameter)
success = False
tb = None
@@ -135,659 +272,741 @@ class CodeServer(object):
result = {'success': success, 'error': err}
return result
- def run_bash_code(self, answer, test_parameter, in_dir=None):
- """Tests given Bash code (`answer`) with the `test_code` supplied.
-
- The testcode should typically contain two lines, the first is a path to
- the reference script we are to compare against. The second is a path
- to the arguments to be supplied to the reference and submitted script.
- The output of these will be compared for correctness.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- def _set_exec(fname):
- os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
- | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
- | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
- submit_f = open('submit.sh', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
- _set_exec(submit_path)
-
- # ref path and path to arguments is a comma seperated string obtained
- #from ref_code_path field in TestCase Mode
- path_list = test_parameter.get('ref_code_path').split(',')
- ref_path = path_list[0].strip() if path_list[0] else ""
- test_case_path = path_list[1].strip() if path_list[1] else ""
-
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
- if not test_case_path.startswith('/'):
- test_case_path = join(MY_DIR, test_case_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self.check_bash_script(ref_path, submit_path,
- test_case_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _run_command(self, cmd_args, *args, **kw):
- """Run a command in a subprocess while blocking, the process is killed
- if it takes more than 2 seconds to run. Return the Popen object, the
- stdout and stderr.
- """
- try:
- proc = subprocess.Popen(cmd_args, *args, **kw)
- stdout, stderr = proc.communicate()
- except TimeoutException:
- # Runaway code, so kill it.
- proc.kill()
- # Re-raise exception.
- raise
- return proc, stdout, stderr
-
- def check_bash_script(self, ref_script_path, submit_script_path,
- test_case_path=None):
- """ Function validates student script using instructor script as
- reference. Test cases can optionally be provided. The first argument
- ref_script_path, is the path to instructor script, it is assumed to
- have executable permission. The second argument submit_script_path, is
- the path to the student script, it is assumed to have executable
- permission. The Third optional argument is the path to test the
- scripts. Each line in this file is a test case and each test case is
- passed to the script as standard arguments.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student script passes all
- test cases/have same output, when compared to the instructor script
-
- returns (False, error_msg): If the student script fails a single
- test/have dissimilar output, when compared to the instructor script.
-
- Returns (False, error_msg): If mandatory arguments are not files or if
- the required permissions are not given to the file(s).
-
- """
- if not ref_script_path:
- return False, "No Test Script path found"
- if not isfile(ref_script_path):
- return False, "No file at %s" % ref_script_path
- if not isfile(submit_script_path):
- return False, 'No file at %s' % submit_script_path
- if not os.access(ref_script_path, os.X_OK):
- return False, 'Script %s is not executable' % ref_script_path
- if not os.access(submit_script_path, os.X_OK):
- return False, 'Script %s is not executable' % submit_script_path
-
- if test_case_path is None or "":
- ret = self._run_command(ref_script_path, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, inst_stdout, inst_stderr = ret
- ret = self._run_command(submit_script_path, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdnt_stdout, stdnt_stderr = ret
- if inst_stdout == stdnt_stdout:
- return True, 'Correct answer'
- else:
- err = "Error: expected %s, got %s" % (inst_stderr,
- stdnt_stderr)
- return False, err
- else:
- if not isfile(test_case_path):
- return False, "No test case at %s" % test_case_path
- if not os.access(ref_script_path, os.R_OK):
- return False, "Test script %s, not readable" % test_case_path
- valid_answer = True # We initially make it one, so that we can
- # stop once a test case fails
- loop_count = 0 # Loop count has to be greater than or
- # equal to one.
- # Useful for caching things like empty
- # test files,etc.
- test_cases = open(test_case_path).readlines()
- num_lines = len(test_cases)
- for test_case in test_cases:
- loop_count += 1
- if valid_answer:
- args = [ref_script_path] + [x for x in test_case.split()]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, inst_stdout, inst_stderr = ret
- args = [submit_script_path]+[x for x in test_case.split()]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdnt_stdout, stdnt_stderr = ret
- valid_answer = inst_stdout == stdnt_stdout
- if valid_answer and (num_lines == loop_count):
- return True, "Correct answer"
- else:
- err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
- stdnt_stdout+stdnt_stderr)
- return False, err
-
- def run_c_code(self, answer, test_parameter, in_dir=None):
- """Tests given C code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # File extension must be .c
- submit_f = open('submit.c', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_c_cpp_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _compile_command(self, cmd, *args, **kw):
- """Compiles C/C++/java code and returns errors if any.
- Run a command in a subprocess while blocking, the process is killed
- if it takes more than 2 seconds to run. Return the Popen object, the
- stderr.
- """
- try:
- proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = proc_compile.communicate()
- except TimeoutException:
- # Runaway code, so kill it.
- proc_compile.kill()
- # Re-raise exception.
- raise
- return proc_compile, err
-
- def _check_c_cpp_code(self, ref_code_path, submit_code_path):
- """ Function validates student code using instructor code as
- reference.The first argument ref_code_path, is the path to
- instructor code, it is assumed to have executable permission.
- The second argument submit_code_path, is the path to the student
- code, it is assumed to have executable permission.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student function returns
- expected output when called by reference code.
-
- returns (False, error_msg): If the student function fails to return
- expected output when called by reference code.
-
- Returns (False, error_msg): If mandatory arguments are not files or
- if the required permissions are not given to the file(s).
-
- """
- if not isfile(ref_code_path):
- return False, "No file at %s" % ref_code_path
- if not isfile(submit_code_path):
- return False, 'No file at %s' % submit_code_path
-
- success = False
- output_path = os.getcwd() + '/output'
- compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
- ret = self._compile_command(compile_command)
- proc, stdnt_stderr = ret
-
- # Only if compilation is successful, the program is executed
- # And tested with testcases
- if stdnt_stderr == '':
- executable = os.getcwd() + '/executable'
- compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
- executable)
- ret = self._compile_command(compile_main)
- proc, main_err = ret
- if main_err == '':
- args = [executable]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
- if proc.returncode == 0:
- success, err = True, "Correct answer"
- else:
- err = stdout + "\n" + stderr
- os.remove(executable)
- else:
- err = "Error:"
- try:
- error_lines = main_err.splitlines()
- for e in error_lines:
- err = err + "\n" + e.split(":", 1)[1]
- except:
- err = err + "\n" + main_err
- os.remove(output_path)
- else:
- err = "Compilation Error:"
- try:
- error_lines = stdnt_stderr.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + stdnt_stderr
- return success, err
-
- def run_cplus_code(self, answer, test_parameter, in_dir=None):
- """Tests given C++ code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # The file extension must be .cpp
- submit_f = open('submitstd.cpp', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_c_cpp_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def run_java_code(self, answer, test_parameter, in_dir=None):
- """Tests given java code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # The file extension must be .java
- # The class name and file name must be same in java
- submit_f = open('Test.java', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_java_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- return success, err
-
- def _check_java_code(self, ref_code_path, submit_code_path):
- """ Function validates student code using instructor code as
- reference.The first argument ref_code_path, is the path to
- instructor code, it is assumed to have executable permission.
- The second argument submit_code_path, is the path to the student
- code, it is assumed to have executable permission.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student function returns
- expected output when called by reference code.
-
- returns (False, error_msg): If the student function fails to return
- expected output when called by reference code.
-
- Returns (False, error_msg): If mandatory arguments are not files or
- if the required permissions are not given to the file(s).
-
- """
- if not isfile(ref_code_path):
- return False, "No file at %s" % ref_code_path
- if not isfile(submit_code_path):
- return False, 'No file at %s' % submit_code_path
-
- success = False
- compile_command = "javac %s" % (submit_code_path)
- ret = self._compile_command(compile_command)
- proc, stdnt_stderr = ret
- stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
-
- # Only if compilation is successful, the program is executed
- # And tested with testcases
- if stdnt_stderr == '':
- student_directory = os.getcwd() + '/'
- student_file_name = "Test"
- compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
- student_directory,
- student_directory)
- ret = self._compile_command(compile_main)
- proc, main_err = ret
- main_err = self._remove_null_substitute_char(main_err)
-
- if main_err == '':
- main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
- run_command = "java -cp %s %s" % (student_directory,
- main_file_name)
- ret = self._run_command(run_command,
- stdin=None,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
- if proc.returncode == 0:
- success, err = True, "Correct answer"
- else:
- err = stdout + "\n" + stderr
- success = False
- os.remove("%s%s.class" % (student_directory, main_file_name))
- else:
- err = "Error:\n"
- try:
- error_lines = main_err.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + main_err
- os.remove("%s%s.class" % (student_directory, student_file_name))
- else:
- err = "Compilation Error:\n"
- try:
- error_lines = stdnt_stderr.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + stdnt_stderr
- result = {'success': success, 'error': err}
- return result
-
- def _remove_null_substitute_char(self, string):
- """Returns a string without any null and substitute characters"""
- stripped = ""
- for c in string:
- if ord(c) is not 26 and ord(c) is not 0:
- stripped = stripped + c
- return ''.join(stripped)
+# ###############################################################################
+# # `CodeServer` class.
+# ###############################################################################
+# class CodeServer(object):
+# """A code server that executes user submitted test code, tests it and
+# reports if the code was correct or not.
+# """
+# def __init__(self, port, queue):
+# self.port = port
+# self.queue = queue
+# msg = 'Code took more than %s seconds to run. You probably '\
+# 'have an infinite loop in your code.' % SERVER_TIMEOUT
+# self.timeout_msg = msg
+
+# def run_python_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Python function (`answer`) with the `test_code`
+# supplied. If the optional `in_dir` keyword argument is supplied
+# it changes the directory to that directory (it does not change
+# it back to the original when done). This function also timesout
+# when the function takes more than SERVER_TIMEOUT seconds to run
+# to prevent runaway code.
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# test_parameter = json.loads(test_parameter)
+# success = False
+# tb = None
+
+# test_code = ""
+# for test_case in test_parameter:
+# pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
+# else ""
+# kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
+# if test_case.get('kw_args') else ""
+# args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
+# tcode = "assert {0}({1}) == {2}" \
+# .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
+# test_code += tcode + "\n"
+# try:
+# submitted = compile(answer, '<string>', mode='exec')
+# g = {}
+# exec submitted in g
+# _tests = compile(test_code, '<string>', mode='exec')
+# exec _tests in g
+# except TimeoutException:
+# err = self.timeout_msg
+# except AssertionError:
+# type, value, tb = sys.exc_info()
+# info = traceback.extract_tb(tb)
+# fname, lineno, func, text = info[-1]
+# text = str(test_code).splitlines()[lineno-1]
+# err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# else:
+# success = True
+# err = 'Correct answer'
+# finally:
+# del tb
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def run_bash_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Bash code (`answer`) with the `test_code` supplied.
+
+# The testcode should typically contain two lines, the first is a path to
+# the reference script we are to compare against. The second is a path
+# to the arguments to be supplied to the reference and submitted script.
+# The output of these will be compared for correctness.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# def _set_exec(fname):
+# os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
+# | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
+# | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
+# submit_f = open('submit.sh', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+# _set_exec(submit_path)
+
+# # ref path and path to arguments is a comma seperated string obtained
+# #from ref_code_path field in TestCase Mode
+# path_list = test_parameter.get('ref_code_path').split(',')
+# ref_path = path_list[0].strip() if path_list[0] else ""
+# test_case_path = path_list[1].strip() if path_list[1] else ""
+
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+# if not test_case_path.startswith('/'):
+# test_case_path = join(MY_DIR, test_case_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self.check_bash_script(ref_path, submit_path,
+# test_case_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _run_command(self, cmd_args, *args, **kw):
+# """Run a command in a subprocess while blocking, the process is killed
+# if it takes more than 2 seconds to run. Return the Popen object, the
+# stdout and stderr.
+# """
+# try:
+# proc = subprocess.Popen(cmd_args, *args, **kw)
+# stdout, stderr = proc.communicate()
+# except TimeoutException:
+# # Runaway code, so kill it.
+# proc.kill()
+# # Re-raise exception.
+# raise
+# return proc, stdout, stderr
+
+# def check_bash_script(self, ref_script_path, submit_script_path,
+# test_case_path=None):
+# """ Function validates student script using instructor script as
+# reference. Test cases can optionally be provided. The first argument
+# ref_script_path, is the path to instructor script, it is assumed to
+# have executable permission. The second argument submit_script_path, is
+# the path to the student script, it is assumed to have executable
+# permission. The Third optional argument is the path to test the
+# scripts. Each line in this file is a test case and each test case is
+# passed to the script as standard arguments.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student script passes all
+# test cases/have same output, when compared to the instructor script
+
+# returns (False, error_msg): If the student script fails a single
+# test/have dissimilar output, when compared to the instructor script.
+
+# Returns (False, error_msg): If mandatory arguments are not files or if
+# the required permissions are not given to the file(s).
+
+# """
+# if not ref_script_path:
+# return False, "No Test Script path found"
+# if not isfile(ref_script_path):
+# return False, "No file at %s" % ref_script_path
+# if not isfile(submit_script_path):
+# return False, 'No file at %s' % submit_script_path
+# if not os.access(ref_script_path, os.X_OK):
+# return False, 'Script %s is not executable' % ref_script_path
+# if not os.access(submit_script_path, os.X_OK):
+# return False, 'Script %s is not executable' % submit_script_path
+
+# if test_case_path is None or "":
+# ret = self._run_command(ref_script_path, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, inst_stdout, inst_stderr = ret
+# ret = self._run_command(submit_script_path, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdnt_stdout, stdnt_stderr = ret
+# if inst_stdout == stdnt_stdout:
+# return True, 'Correct answer'
+# else:
+# err = "Error: expected %s, got %s" % (inst_stderr,
+# stdnt_stderr)
+# return False, err
+# else:
+# if not isfile(test_case_path):
+# return False, "No test case at %s" % test_case_path
+# if not os.access(ref_script_path, os.R_OK):
+# return False, "Test script %s, not readable" % test_case_path
+# valid_answer = True # We initially make it one, so that we can
+# # stop once a test case fails
+# loop_count = 0 # Loop count has to be greater than or
+# # equal to one.
+# # Useful for caching things like empty
+# # test files,etc.
+# test_cases = open(test_case_path).readlines()
+# num_lines = len(test_cases)
+# for test_case in test_cases:
+# loop_count += 1
+# if valid_answer:
+# args = [ref_script_path] + [x for x in test_case.split()]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, inst_stdout, inst_stderr = ret
+# args = [submit_script_path]+[x for x in test_case.split()]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdnt_stdout, stdnt_stderr = ret
+# valid_answer = inst_stdout == stdnt_stdout
+# if valid_answer and (num_lines == loop_count):
+# return True, "Correct answer"
+# else:
+# err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
+# stdnt_stdout+stdnt_stderr)
+# return False, err
+
+# def run_c_code(self, answer, test_parameter, in_dir=None):
+# """Tests given C code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # File extension must be .c
+# submit_f = open('submit.c', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_c_cpp_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _compile_command(self, cmd, *args, **kw):
+# """Compiles C/C++/java code and returns errors if any.
+# Run a command in a subprocess while blocking, the process is killed
+# if it takes more than 2 seconds to run. Return the Popen object, the
+# stderr.
+# """
+# try:
+# proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# out, err = proc_compile.communicate()
+# except TimeoutException:
+# # Runaway code, so kill it.
+# proc_compile.kill()
+# # Re-raise exception.
+# raise
+# return proc_compile, err
+
+# def _check_c_cpp_code(self, ref_code_path, submit_code_path):
+# """ Function validates student code using instructor code as
+# reference.The first argument ref_code_path, is the path to
+# instructor code, it is assumed to have executable permission.
+# The second argument submit_code_path, is the path to the student
+# code, it is assumed to have executable permission.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student function returns
+# expected output when called by reference code.
+
+# returns (False, error_msg): If the student function fails to return
+# expected output when called by reference code.
+
+# Returns (False, error_msg): If mandatory arguments are not files or
+# if the required permissions are not given to the file(s).
+
+# """
+# if not isfile(ref_code_path):
+# return False, "No file at %s" % ref_code_path
+# if not isfile(submit_code_path):
+# return False, 'No file at %s' % submit_code_path
+
+# success = False
+# output_path = os.getcwd() + '/output'
+# compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
+# ret = self._compile_command(compile_command)
+# proc, stdnt_stderr = ret
+
+# # Only if compilation is successful, the program is executed
+# # And tested with testcases
+# if stdnt_stderr == '':
+# executable = os.getcwd() + '/executable'
+# compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
+# executable)
+# ret = self._compile_command(compile_main)
+# proc, main_err = ret
+# if main_err == '':
+# args = [executable]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+# if proc.returncode == 0:
+# success, err = True, "Correct answer"
+# else:
+# err = stdout + "\n" + stderr
+# os.remove(executable)
+# else:
+# err = "Error:"
+# try:
+# error_lines = main_err.splitlines()
+# for e in error_lines:
+# err = err + "\n" + e.split(":", 1)[1]
+# except:
+# err = err + "\n" + main_err
+# os.remove(output_path)
+# else:
+# err = "Compilation Error:"
+# try:
+# error_lines = stdnt_stderr.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + stdnt_stderr
+# return success, err
+
+# def run_cplus_code(self, answer, test_parameter, in_dir=None):
+# """Tests given C++ code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # The file extension must be .cpp
+# submit_f = open('submitstd.cpp', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_c_cpp_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def run_java_code(self, answer, test_parameter, in_dir=None):
+# """Tests given java code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # The file extension must be .java
+# # The class name and file name must be same in java
+# submit_f = open('Test.java', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_java_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# return success, err
+
+# def _check_java_code(self, ref_code_path, submit_code_path):
+# """ Function validates student code using instructor code as
+# reference.The first argument ref_code_path, is the path to
+# instructor code, it is assumed to have executable permission.
+# The second argument submit_code_path, is the path to the student
+# code, it is assumed to have executable permission.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student function returns
+# expected output when called by reference code.
+
+# returns (False, error_msg): If the student function fails to return
+# expected output when called by reference code.
+
+# Returns (False, error_msg): If mandatory arguments are not files or
+# if the required permissions are not given to the file(s).
+
+# """
+# if not isfile(ref_code_path):
+# return False, "No file at %s" % ref_code_path
+# if not isfile(submit_code_path):
+# return False, 'No file at %s' % submit_code_path
+
+# success = False
+# compile_command = "javac %s" % (submit_code_path)
+# ret = self._compile_command(compile_command)
+# proc, stdnt_stderr = ret
+# stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
+
+# # Only if compilation is successful, the program is executed
+# # And tested with testcases
+# if stdnt_stderr == '':
+# student_directory = os.getcwd() + '/'
+# student_file_name = "Test"
+# compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
+# student_directory,
+# student_directory)
+# ret = self._compile_command(compile_main)
+# proc, main_err = ret
+# main_err = self._remove_null_substitute_char(main_err)
+
+# if main_err == '':
+# main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
+# run_command = "java -cp %s %s" % (student_directory,
+# main_file_name)
+# ret = self._run_command(run_command,
+# stdin=None,
+# shell=True,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+# if proc.returncode == 0:
+# success, err = True, "Correct answer"
+# else:
+# err = stdout + "\n" + stderr
+# success = False
+# os.remove("%s%s.class" % (student_directory, main_file_name))
+# else:
+# err = "Error:\n"
+# try:
+# error_lines = main_err.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + main_err
+# os.remove("%s%s.class" % (student_directory, student_file_name))
+# else:
+# err = "Compilation Error:\n"
+# try:
+# error_lines = stdnt_stderr.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + stdnt_stderr
+# result = {'success': success, 'error': err}
+# return result
+
+# def _remove_null_substitute_char(self, string):
+# """Returns a string without any null and substitute characters"""
+# stripped = ""
+# for c in string:
+# if ord(c) is not 26 and ord(c) is not 0:
+# stripped = stripped + c
+# return ''.join(stripped)
- def run_scilab_code(self, answer, test_parameter, in_dir=None):
- """Tests given Scilab function (`answer`) with the `test_code`
- supplied. If the optional `in_dir` keyword argument is supplied
- it changes the directory to that directory (it does not change
- it back to the original when done). This function also timesout
- when the function takes more than SERVER_TIMEOUT seconds to run
- to prevent runaway code.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
+# def run_scilab_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Scilab function (`answer`) with the `test_code`
+# supplied. If the optional `in_dir` keyword argument is supplied
+# it changes the directory to that directory (it does not change
+# it back to the original when done). This function also timesout
+# when the function takes more than SERVER_TIMEOUT seconds to run
+# to prevent runaway code.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
- # Removes all the commands that terminates scilab
- answer,i = self._remove_scilab_exit(answer.lstrip())
-
- # Throw message if there are commmands that terminates scilab
- add_err=""
- if i > 0:
- add_err = "Please do not use exit, quit and abort commands in your\
- code.\n Otherwise your code will not be evaluated\
- correctly.\n"
-
- # The file extension should be .sci
- submit_f = open('function.sci','w')
- submit_f.write(answer)
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
- cmd += ' | timeout 8 scilab-cli -nb'
- ret = self._run_command(cmd,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
-
- # Get only the error.
- stderr = self._get_error(stdout)
- if stderr is None:
- # Clean output
- stdout = self._strip_output(stdout)
- if proc.returncode == 5:
- success, err = True, "Correct answer"
- else:
- err = add_err + stdout
- else:
- err = add_err + stderr
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _remove_scilab_exit(self, string):
- """
- Removes exit, quit and abort from the scilab code
- """
- new_string = ""
- i=0
- for line in string.splitlines():
- new_line = re.sub(r"exit.*$","",line)
- new_line = re.sub(r"quit.*$","",new_line)
- new_line = re.sub(r"abort.*$","",new_line)
- if line != new_line:
- i=i+1
- new_string = new_string +'\n'+ new_line
- return new_string, i
+# # Removes all the commands that terminates scilab
+# answer,i = self._remove_scilab_exit(answer.lstrip())
+
+# # Throw message if there are commmands that terminates scilab
+# add_err=""
+# if i > 0:
+# add_err = "Please do not use exit, quit and abort commands in your\
+# code.\n Otherwise your code will not be evaluated\
+# correctly.\n"
+
+# # The file extension should be .sci
+# submit_f = open('function.sci','w')
+# submit_f.write(answer)
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
+# cmd += ' | timeout 8 scilab-cli -nb'
+# ret = self._run_command(cmd,
+# shell=True,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+
+# # Get only the error.
+# stderr = self._get_error(stdout)
+# if stderr is None:
+# # Clean output
+# stdout = self._strip_output(stdout)
+# if proc.returncode == 5:
+# success, err = True, "Correct answer"
+# else:
+# err = add_err + stdout
+# else:
+# err = add_err + stderr
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _remove_scilab_exit(self, string):
+# """
+# Removes exit, quit and abort from the scilab code
+# """
+# new_string = ""
+# i=0
+# for line in string.splitlines():
+# new_line = re.sub(r"exit.*$","",line)
+# new_line = re.sub(r"quit.*$","",new_line)
+# new_line = re.sub(r"abort.*$","",new_line)
+# if line != new_line:
+# i=i+1
+# new_string = new_string +'\n'+ new_line
+# return new_string, i
def _get_error(self, string):
"""