summaryrefslogtreecommitdiff
path: root/testapp
diff options
context:
space:
mode:
authorankitjavalkar2015-03-18 11:26:38 +0530
committerankitjavalkar2015-04-26 19:43:19 +0530
commita88e59fa419d7ab31aa430e7424ff0a25169713f (patch)
treecbc350011f4e4dcdc441ddc08b79664c14b725fa /testapp
parent9440bff5ae69c1d27f5c9622ca15cb8c603c6174 (diff)
downloadonline_test-a88e59fa419d7ab31aa430e7424ff0a25169713f.tar.gz
online_test-a88e59fa419d7ab31aa430e7424ff0a25169713f.tar.bz2
online_test-a88e59fa419d7ab31aa430e7424ff0a25169713f.zip
Code server code cleanup and code commonification
- Commonify C, C++, Java and Scilab code evaluation
Diffstat (limited to 'testapp')
-rwxr-xr-xtestapp/exam/code_server.py1246
1 files changed, 389 insertions, 857 deletions
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py
index fa5d916..2259ce8 100755
--- a/testapp/exam/code_server.py
+++ b/testapp/exam/code_server.py
@@ -85,21 +85,34 @@ class TestCode(object):
self.in_dir = in_dir
def run_code(self):
- self._change_dir(self.in_dir)
+ """Tests given code (`answer`) with the `test_code` supplied.
- # Create test cases
- test_code = self.create_test_case()
- # Evaluate, run code and obtain result
- result = self.evaluate_code(test_code)
+ The ref_code_path is a path to the reference code.
+ The reference code will call the function submitted by the student.
+ The reference code will check for the expected output.
- return result
+ If the path's start with a "/" then we assume they are absolute paths.
+ If not, we assume they are relative paths w.r.t. the location of this
+ code_server script.
+
+ If the optional `in_dir` keyword argument is supplied it changes the
+ directory to that directory (it does not change it back to the original
+ when done).
+
+ Returns
+ -------
- def evaluate_code(self, test_code):
+ A tuple: (success, error message).
+ """
success = False
prev_handler = None
+ self._change_dir(self.in_dir)
+
if self.language == "python":
tb = None
+ test_code = self.create_test_case()
+ # Add a new signal handler for the execution of this code.
prev_handler = create_delete_signal_handler("new")
try:
@@ -130,26 +143,383 @@ class TestCode(object):
# Cancel the signal
create_delete_signal_handler("delete")
- if self.language == "c++":
- pass
+ else:
+ user_answer_file = {'C': 'submit.c', 'java': 'Test.java', 'scilab': 'function.sci',
+ 'C++': 'submitstd.cpp', 'bash': 'submit.sh'}
+
+ # File extension depending on the question language
+ submit_f = open(user_answer_file.get(self.language), 'w')
+ submit_f.write(self.user_answer.lstrip())
+ submit_f.close()
+ submit_path = abspath(submit_f.name)
+ if self.language == "bash":
+ self._set_exec(submit_path)
+
+ path_list = self.ref_code_path.split(',')
+ ref_path = path_list[0].strip() if path_list[0] else ""
+ test_case_path = path_list[1].strip() if path_list[1] else ""
+ if ref_path and not ref_path.startswith('/'):
+ ref_path = join(MY_DIR, ref_path)
+ if test_case_path and not test_case_path.startswith('/'):
+ test_case_path = join(MY_DIR, test_case_path)
+
+ if self.language in ["C++", "C", "java"]:
+ # Add a new signal handler for the execution of this code.
+ prev_handler = create_delete_signal_handler("new")
+
+ # Do whatever testing needed.
+ try:
+ success, err = self._check_code(ref_path, submit_path)
+ except TimeoutException:
+ err = self.timeout_msg
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ finally:
+ # # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ elif self.language == "scilab":
+ # Add a new signal handler for the execution of this code.
+ prev_handler = create_delete_signal_handler("new")
+
+ # Do whatever testing needed.
+ try:
+ cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
+ cmd += ' | timeout 8 scilab-cli -nb'
+ ret = self._run_command(cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdout, stderr = ret
+
+ # Get only the error.
+ stderr = self._get_error(stdout)
+ if stderr is None:
+ # Clean output
+ stdout = self._strip_output(stdout)
+ if proc.returncode == 5:
+ success, err = True, "Correct answer"
+ else:
+ err = add_err + stdout
+ else:
+ err = add_err + stderr
+ except TimeoutException:
+ err = self.timeout_msg
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ finally:
+ # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ elif self.language == "bash":
+ # Add a new signal handler for the execution of this code.
+ prev_handler = create_delete_signal_handler("new")
+
+ try:
+ success, err = self.check_bash_script(ref_path, submit_path,
+ test_case_path)
+ except TimeoutException:
+ err = self.timeout_msg
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ finally:
+ # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ # Delete the created file.
+ os.remove(submit_path)
- if self.language == "c":
- pass
+ # Cancel the signal
+ create_delete_signal_handler("delete")
- if self.language == "java":
- pass
+ result = {'success': success, 'error': err}
+ return result
- if self.language == "scilab":
- pass
+ def _check_code(self, ref_code_path, submit_code_path):
+ """ Function validates student code using instructor code as
+ reference.The first argument ref_code_path, is the path to
+ instructor code, it is assumed to have executable permission.
+ The second argument submit_code_path, is the path to the student
+ code, it is assumed to have executable permission.
+
+ Returns
+ --------
+
+ returns (True, "Correct answer") : If the student function returns
+ expected output when called by reference code.
+
+ returns (False, error_msg): If the student function fails to return
+ expected output when called by reference code.
+
+ Returns (False, error_msg): If mandatory arguments are not files or
+ if the required permissions are not given to the file(s).
+
+ """
+
+ language_dependent_path = {
+ 'c_user_output_path': os.getcwd() + '/output',
+ 'c_ref_output_path': os.getcwd() + '/executable',
+ 'java_student_directory': os.getcwd() + '/',
+ # 'java_student_file_name': 'Test',
+ 'java_ref_file_name': (ref_code_path.split('/')[-1]).split('.')[0],
+ }
+
+ language_dependent_var = {
+ 'C': {'compile_command': 'g++ {0} -c -o {1}'.format(submit_code_path,
+ language_dependent_path.get('c_user_output_path')),
+ 'compile_main': 'g++ {0} {1} -o {2}'.format(ref_code_path,
+ language_dependent_path.get('c_user_output_path'),
+ language_dependent_path.get('c_ref_output_path')),
+ 'run_command_args': [language_dependent_path.get('c_ref_output_path')],
+ 'remove_user_output': language_dependent_path.get('c_user_output_path'),
+ 'remove_ref_output': language_dependent_path.get('c_ref_output_path')
+ },
+ 'java':{'compile_command': 'javac {0}'.format(submit_code_path),
+ 'compile_main': 'javac {0} -classpath {1} -d {2}'.format(ref_code_path,
+ language_dependent_path.get('java_student_directory'),
+ language_dependent_path.get('java_student_directory')),
+ 'run_command_args': "java -cp {0} {1}".format(
+ language_dependent_path.get('java_student_directory'),
+ language_dependent_path.get('java_ref_file_name')),
+ 'remove_user_output': "%s%s.class".format(
+ language_dependent_path.get('java_student_directory'),
+ 'Test'),
+ 'remove_ref_output': "%s%s.class".format(
+ language_dependent_path.get('java_student_directory'),
+ language_dependent_path.get('java_ref_file_name')),
+ }
+ }
+
+ if not isfile(ref_code_path):
+ return False, "No file at %s or Incorrect path" % ref_code_path
+ if not isfile(submit_code_path):
+ return False, 'No file at %s or Incorrect path' % submit_code_path
+
+ success = False
+ # output_path = os.getcwd() + '/output'
+ compile_command = language_dependent_var.get(self.language).get('compile_command') # "g++ %s -c -o %s" % (submit_code_path, output_path)
+ ret = self._compile_command(compile_command)
+ proc, stdnt_stderr = ret
+ if self.language == "java":
+ stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
+
+ # Only if compilation is successful, the program is executed
+ # And tested with testcases
+ if stdnt_stderr == '':
+ compile_main = language_dependent_var.get(self.language).get('compile_main') # "g++ %s %s -o %s" % (ref_code_path, output_path, executable)
+ ret = self._compile_command(compile_main)
+ proc, main_err = ret
+ if self.language == "java":
+ main_err = self._remove_null_substitute_char(main_err)
+
+ if main_err == '':
+ run_command_args = language_dependent_var.get(self.language).get('run_command_args')
+ ret = self._run_command(run_command_args, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdout, stderr = ret
+ if proc.returncode == 0:
+ success, err = True, "Correct answer"
+ else:
+ err = stdout + "\n" + stderr
+ os.remove(language_dependent_var.get(self.language).get('remove_ref_output')) # os.remove(executable)
+ else:
+ err = "Error:"
+ try:
+ error_lines = main_err.splitlines()
+ for e in error_lines:
+ if ':' in e:
+ err = err + "\n" + e.split(":", 1)[1]
+ else:
+ err = err + "\n" + e
+ except:
+ err = err + "\n" + main_err
+ os.remove(language_dependent_var.get(self.language).get('remove_user_output')) # os.remove(output_path)
+ else:
+ err = "Compilation Error:"
+ try:
+ error_lines = stdnt_stderr.splitlines()
+ for e in error_lines:
+ if ':' in e:
+ err = err + "\n" + e.split(":", 1)[1]
+ else:
+ err = err + "\n" + e
+ except:
+ err = err + "\n" + stdnt_stderr
result = {'success': success, 'error': err}
return result
- def compile_code(self):
- pass
+ def check_bash_script(self, ref_path, submit_path,
+ test_case_path=None):
+ """ Function validates student script using instructor script as
+ reference. Test cases can optionally be provided. The first argument
+ ref_path, is the path to instructor script, it is assumed to
+ have executable permission. The second argument submit_path, is
+ the path to the student script, it is assumed to have executable
+ permission. The Third optional argument is the path to test the
+ scripts. Each line in this file is a test case and each test case is
+ passed to the script as standard arguments.
+
+ Returns
+ --------
+
+ returns (True, "Correct answer") : If the student script passes all
+ test cases/have same output, when compared to the instructor script
+
+ returns (False, error_msg): If the student script fails a single
+ test/have dissimilar output, when compared to the instructor script.
+
+ Returns (False, error_msg): If mandatory arguments are not files or if
+ the required permissions are not given to the file(s).
+
+ """
+ if not isfile(ref_path):
+ return False, "No file at %s or Incorrect path" % ref_path
+ if not isfile(submit_path):
+ return False, "No file at %s or Incorrect path" % submit_path
+ if not os.access(ref_path, os.X_OK):
+ return False, "Script %s is not executable" % ref_path
+ if not os.access(submit_path, os.X_OK):
+ return False, "Script %s is not executable" % submit_path
+
+ if test_case_path is None or "":
+ ret = self._run_command(ref_path, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, inst_stdout, inst_stderr = ret
+ ret = self._run_command(submit_path, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdnt_stdout, stdnt_stderr = ret
+ if inst_stdout == stdnt_stdout:
+ return True, "Correct answer"
+ else:
+ err = "Error: expected %s, got %s" % (inst_stderr,
+ stdnt_stderr)
+ return False, err
+ else:
+ if not isfile(test_case_path):
+ return False, "No test case at %s" % test_case_path
+ if not os.access(ref_path, os.R_OK):
+ return False, "Test script %s, not readable" % test_case_path
+ valid_answer = True # We initially make it one, so that we can
+ # stop once a test case fails
+ loop_count = 0 # Loop count has to be greater than or
+ # equal to one.
+ # Useful for caching things like empty
+ # test files,etc.
+ test_cases = open(test_case_path).readlines()
+ num_lines = len(test_cases)
+ for test_case in test_cases:
+ loop_count += 1
+ if valid_answer:
+ args = [ref_path] + [x for x in test_case.split()]
+ ret = self._run_command(args, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, inst_stdout, inst_stderr = ret
+ args = [submit_path]+[x for x in test_case.split()]
+ ret = self._run_command(args, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdnt_stdout, stdnt_stderr = ret
+ valid_answer = inst_stdout == stdnt_stdout
+ if valid_answer and (num_lines == loop_count):
+ return True, "Correct answer"
+ else:
+ err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
+ stdnt_stdout+stdnt_stderr)
+ return False, err
+
+ def _run_command(self, cmd_args, *args, **kw):
+ """Run a command in a subprocess while blocking, the process is killed
+ if it takes more than 2 seconds to run. Return the Popen object, the
+ stdout and stderr.
+ """
+ try:
+ proc = subprocess.Popen(cmd_args, *args, **kw)
+ stdout, stderr = proc.communicate()
+ except TimeoutException:
+ # Runaway code, so kill it.
+ proc.kill()
+ # Re-raise exception.
+ raise
+ return proc, stdout, stderr
+
+ def _compile_command(self, cmd, *args, **kw):
+ """Compiles C/C++/java code and returns errors if any.
+ Run a command in a subprocess while blocking, the process is killed
+ if it takes more than 2 seconds to run. Return the Popen object, the
+ stderr.
+ """
+ try:
+ proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = proc_compile.communicate()
+ except TimeoutException:
+ # Runaway code, so kill it.
+ proc_compile.kill()
+ # Re-raise exception.
+ raise
+ return proc_compile, err
+
+ def _remove_null_substitute_char(self, string):
+ """Returns a string without any null and substitute characters"""
+ stripped = ""
+ for c in string:
+ if ord(c) is not 26 and ord(c) is not 0:
+ stripped = stripped + c
+ return ''.join(stripped)
+
+ def _remove_scilab_exit(self, string):
+ """
+ Removes exit, quit and abort from the scilab code
+ """
+ new_string = ""
+ i=0
+ for line in string.splitlines():
+ new_line = re.sub(r"exit.*$","",line)
+ new_line = re.sub(r"quit.*$","",new_line)
+ new_line = re.sub(r"abort.*$","",new_line)
+ if line != new_line:
+ i=i+1
+ new_string = new_string +'\n'+ new_line
+ return new_string, i
+
+ def _get_error(self, string):
+ """
+ Fetches only the error from the string.
+ Returns None if no error.
+ """
+ obj = re.search("!.+\n.+",string);
+ if obj:
+ return obj.group()
+ return None
+
+ def _strip_output(self, out):
+ """
+ Cleans whitespace from the output
+ """
+ strip_out = "Message"
+ for l in out.split('\n'):
+ if l.strip():
+ strip_out = strip_out+"\n"+l.strip()
+ return strip_out
+
+ def _set_exec(self, fname):
+ os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
+ | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
+ | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
def create_test_case(self):
- # Create assert based test cases in python
+ """
+ Create assert based test cases in python
+ """
if self.language == "python":
test_code = ""
for test_case in self.test_parameter:
@@ -163,18 +533,6 @@ class TestCode(object):
test_code += tcode + "\n"
return test_code
- if self.language == "c++":
- pass
-
- if self.language == "c":
- pass
-
- if self.language == "java":
- pass
-
- if self.language == "scilab":
- pass
-
def _change_dir(self, in_dir):
if in_dir is not None and isdir(in_dir):
os.chdir(in_dir)
@@ -198,839 +556,13 @@ class CodeServer(object):
"""Calls the TestCode Class to test the current code"""
tc = TestCode(info_parameter, in_dir)
result = tc.run_code()
-
# Put us back into the server pool queue since we are free now.
self.queue.put(self.port)
return json.dumps(result)
- def run_python_code(self, answer, test_parameter, in_dir=None):
- """Tests given Python function (`answer`) with the `test_code`
- supplied. If the optional `in_dir` keyword argument is supplied
- it changes the directory to that directory (it does not change
- it back to the original when done). This function also timesout
- when the function takes more than SERVER_TIMEOUT seconds to run
- to prevent runaway code.
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- test_parameter = json.loads(test_parameter)
- success = False
- tb = None
-
- test_code = ""
- for test_case in test_parameter:
- pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
- else ""
- kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
- if test_case.get('kw_args') else ""
- args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
- tcode = "assert {0}({1}) == {2}" \
- .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
- test_code += tcode + "\n"
- try:
- submitted = compile(answer, '<string>', mode='exec')
- g = {}
- exec submitted in g
- _tests = compile(test_code, '<string>', mode='exec')
- exec _tests in g
- except TimeoutException:
- err = self.timeout_msg
- except AssertionError:
- type, value, tb = sys.exc_info()
- info = traceback.extract_tb(tb)
- fname, lineno, func, text = info[-1]
- text = str(test_code).splitlines()[lineno-1]
- err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- else:
- success = True
- err = 'Correct answer'
- finally:
- del tb
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
-# ###############################################################################
-# # `CodeServer` class.
-# ###############################################################################
-# class CodeServer(object):
-# """A code server that executes user submitted test code, tests it and
-# reports if the code was correct or not.
-# """
-# def __init__(self, port, queue):
-# self.port = port
-# self.queue = queue
-# msg = 'Code took more than %s seconds to run. You probably '\
-# 'have an infinite loop in your code.' % SERVER_TIMEOUT
-# self.timeout_msg = msg
-
-# def run_python_code(self, answer, test_parameter, in_dir=None):
-# """Tests given Python function (`answer`) with the `test_code`
-# supplied. If the optional `in_dir` keyword argument is supplied
-# it changes the directory to that directory (it does not change
-# it back to the original when done). This function also timesout
-# when the function takes more than SERVER_TIMEOUT seconds to run
-# to prevent runaway code.
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# test_parameter = json.loads(test_parameter)
-# success = False
-# tb = None
-
-# test_code = ""
-# for test_case in test_parameter:
-# pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
-# else ""
-# kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
-# if test_case.get('kw_args') else ""
-# args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
-# tcode = "assert {0}({1}) == {2}" \
-# .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
-# test_code += tcode + "\n"
-# try:
-# submitted = compile(answer, '<string>', mode='exec')
-# g = {}
-# exec submitted in g
-# _tests = compile(test_code, '<string>', mode='exec')
-# exec _tests in g
-# except TimeoutException:
-# err = self.timeout_msg
-# except AssertionError:
-# type, value, tb = sys.exc_info()
-# info = traceback.extract_tb(tb)
-# fname, lineno, func, text = info[-1]
-# text = str(test_code).splitlines()[lineno-1]
-# err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# else:
-# success = True
-# err = 'Correct answer'
-# finally:
-# del tb
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def run_bash_code(self, answer, test_parameter, in_dir=None):
-# """Tests given Bash code (`answer`) with the `test_code` supplied.
-
-# The testcode should typically contain two lines, the first is a path to
-# the reference script we are to compare against. The second is a path
-# to the arguments to be supplied to the reference and submitted script.
-# The output of these will be compared for correctness.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# def _set_exec(fname):
-# os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
-# | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
-# | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
-# submit_f = open('submit.sh', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-# _set_exec(submit_path)
-
-# # ref path and path to arguments is a comma seperated string obtained
-# #from ref_code_path field in TestCase Mode
-# path_list = test_parameter.get('ref_code_path').split(',')
-# ref_path = path_list[0].strip() if path_list[0] else ""
-# test_case_path = path_list[1].strip() if path_list[1] else ""
-
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-# if not test_case_path.startswith('/'):
-# test_case_path = join(MY_DIR, test_case_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self.check_bash_script(ref_path, submit_path,
-# test_case_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def _run_command(self, cmd_args, *args, **kw):
-# """Run a command in a subprocess while blocking, the process is killed
-# if it takes more than 2 seconds to run. Return the Popen object, the
-# stdout and stderr.
-# """
-# try:
-# proc = subprocess.Popen(cmd_args, *args, **kw)
-# stdout, stderr = proc.communicate()
-# except TimeoutException:
-# # Runaway code, so kill it.
-# proc.kill()
-# # Re-raise exception.
-# raise
-# return proc, stdout, stderr
-
-# def check_bash_script(self, ref_script_path, submit_script_path,
-# test_case_path=None):
-# """ Function validates student script using instructor script as
-# reference. Test cases can optionally be provided. The first argument
-# ref_script_path, is the path to instructor script, it is assumed to
-# have executable permission. The second argument submit_script_path, is
-# the path to the student script, it is assumed to have executable
-# permission. The Third optional argument is the path to test the
-# scripts. Each line in this file is a test case and each test case is
-# passed to the script as standard arguments.
-
-# Returns
-# --------
-
-# returns (True, "Correct answer") : If the student script passes all
-# test cases/have same output, when compared to the instructor script
-
-# returns (False, error_msg): If the student script fails a single
-# test/have dissimilar output, when compared to the instructor script.
-
-# Returns (False, error_msg): If mandatory arguments are not files or if
-# the required permissions are not given to the file(s).
-
-# """
-# if not ref_script_path:
-# return False, "No Test Script path found"
-# if not isfile(ref_script_path):
-# return False, "No file at %s" % ref_script_path
-# if not isfile(submit_script_path):
-# return False, 'No file at %s' % submit_script_path
-# if not os.access(ref_script_path, os.X_OK):
-# return False, 'Script %s is not executable' % ref_script_path
-# if not os.access(submit_script_path, os.X_OK):
-# return False, 'Script %s is not executable' % submit_script_path
-
-# if test_case_path is None or "":
-# ret = self._run_command(ref_script_path, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, inst_stdout, inst_stderr = ret
-# ret = self._run_command(submit_script_path, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdnt_stdout, stdnt_stderr = ret
-# if inst_stdout == stdnt_stdout:
-# return True, 'Correct answer'
-# else:
-# err = "Error: expected %s, got %s" % (inst_stderr,
-# stdnt_stderr)
-# return False, err
-# else:
-# if not isfile(test_case_path):
-# return False, "No test case at %s" % test_case_path
-# if not os.access(ref_script_path, os.R_OK):
-# return False, "Test script %s, not readable" % test_case_path
-# valid_answer = True # We initially make it one, so that we can
-# # stop once a test case fails
-# loop_count = 0 # Loop count has to be greater than or
-# # equal to one.
-# # Useful for caching things like empty
-# # test files,etc.
-# test_cases = open(test_case_path).readlines()
-# num_lines = len(test_cases)
-# for test_case in test_cases:
-# loop_count += 1
-# if valid_answer:
-# args = [ref_script_path] + [x for x in test_case.split()]
-# ret = self._run_command(args, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, inst_stdout, inst_stderr = ret
-# args = [submit_script_path]+[x for x in test_case.split()]
-# ret = self._run_command(args, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdnt_stdout, stdnt_stderr = ret
-# valid_answer = inst_stdout == stdnt_stdout
-# if valid_answer and (num_lines == loop_count):
-# return True, "Correct answer"
-# else:
-# err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
-# stdnt_stdout+stdnt_stderr)
-# return False, err
-
-# def run_c_code(self, answer, test_parameter, in_dir=None):
-# """Tests given C code (`answer`) with the `test_code` supplied.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # File extension must be .c
-# submit_f = open('submit.c', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self._check_c_cpp_code(ref_path, submit_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def _compile_command(self, cmd, *args, **kw):
-# """Compiles C/C++/java code and returns errors if any.
-# Run a command in a subprocess while blocking, the process is killed
-# if it takes more than 2 seconds to run. Return the Popen object, the
-# stderr.
-# """
-# try:
-# proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# out, err = proc_compile.communicate()
-# except TimeoutException:
-# # Runaway code, so kill it.
-# proc_compile.kill()
-# # Re-raise exception.
-# raise
-# return proc_compile, err
-
-# def _check_c_cpp_code(self, ref_code_path, submit_code_path):
-# """ Function validates student code using instructor code as
-# reference.The first argument ref_code_path, is the path to
-# instructor code, it is assumed to have executable permission.
-# The second argument submit_code_path, is the path to the student
-# code, it is assumed to have executable permission.
-
-# Returns
-# --------
-
-# returns (True, "Correct answer") : If the student function returns
-# expected output when called by reference code.
-
-# returns (False, error_msg): If the student function fails to return
-# expected output when called by reference code.
-
-# Returns (False, error_msg): If mandatory arguments are not files or
-# if the required permissions are not given to the file(s).
-
-# """
-# if not isfile(ref_code_path):
-# return False, "No file at %s" % ref_code_path
-# if not isfile(submit_code_path):
-# return False, 'No file at %s' % submit_code_path
-
-# success = False
-# output_path = os.getcwd() + '/output'
-# compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
-# ret = self._compile_command(compile_command)
-# proc, stdnt_stderr = ret
-
-# # Only if compilation is successful, the program is executed
-# # And tested with testcases
-# if stdnt_stderr == '':
-# executable = os.getcwd() + '/executable'
-# compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
-# executable)
-# ret = self._compile_command(compile_main)
-# proc, main_err = ret
-# if main_err == '':
-# args = [executable]
-# ret = self._run_command(args, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdout, stderr = ret
-# if proc.returncode == 0:
-# success, err = True, "Correct answer"
-# else:
-# err = stdout + "\n" + stderr
-# os.remove(executable)
-# else:
-# err = "Error:"
-# try:
-# error_lines = main_err.splitlines()
-# for e in error_lines:
-# err = err + "\n" + e.split(":", 1)[1]
-# except:
-# err = err + "\n" + main_err
-# os.remove(output_path)
-# else:
-# err = "Compilation Error:"
-# try:
-# error_lines = stdnt_stderr.splitlines()
-# for e in error_lines:
-# if ':' in e:
-# err = err + "\n" + e.split(":", 1)[1]
-# else:
-# err = err + "\n" + e
-# except:
-# err = err + "\n" + stdnt_stderr
-# return success, err
-
-# def run_cplus_code(self, answer, test_parameter, in_dir=None):
-# """Tests given C++ code (`answer`) with the `test_code` supplied.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # The file extension must be .cpp
-# submit_f = open('submitstd.cpp', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self._check_c_cpp_code(ref_path, submit_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def run_java_code(self, answer, test_parameter, in_dir=None):
-# """Tests given java code (`answer`) with the `test_code` supplied.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # The file extension must be .java
-# # The class name and file name must be same in java
-# submit_f = open('Test.java', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self._check_java_code(ref_path, submit_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# return success, err
-
-# def _check_java_code(self, ref_code_path, submit_code_path):
-# """ Function validates student code using instructor code as
-# reference.The first argument ref_code_path, is the path to
-# instructor code, it is assumed to have executable permission.
-# The second argument submit_code_path, is the path to the student
-# code, it is assumed to have executable permission.
-
-# Returns
-# --------
-
-# returns (True, "Correct answer") : If the student function returns
-# expected output when called by reference code.
-
-# returns (False, error_msg): If the student function fails to return
-# expected output when called by reference code.
-
-# Returns (False, error_msg): If mandatory arguments are not files or
-# if the required permissions are not given to the file(s).
-
-# """
-# if not isfile(ref_code_path):
-# return False, "No file at %s" % ref_code_path
-# if not isfile(submit_code_path):
-# return False, 'No file at %s' % submit_code_path
-
-# success = False
-# compile_command = "javac %s" % (submit_code_path)
-# ret = self._compile_command(compile_command)
-# proc, stdnt_stderr = ret
-# stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
-
-# # Only if compilation is successful, the program is executed
-# # And tested with testcases
-# if stdnt_stderr == '':
-# student_directory = os.getcwd() + '/'
-# student_file_name = "Test"
-# compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
-# student_directory,
-# student_directory)
-# ret = self._compile_command(compile_main)
-# proc, main_err = ret
-# main_err = self._remove_null_substitute_char(main_err)
-
-# if main_err == '':
-# main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
-# run_command = "java -cp %s %s" % (student_directory,
-# main_file_name)
-# ret = self._run_command(run_command,
-# stdin=None,
-# shell=True,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdout, stderr = ret
-# if proc.returncode == 0:
-# success, err = True, "Correct answer"
-# else:
-# err = stdout + "\n" + stderr
-# success = False
-# os.remove("%s%s.class" % (student_directory, main_file_name))
-# else:
-# err = "Error:\n"
-# try:
-# error_lines = main_err.splitlines()
-# for e in error_lines:
-# if ':' in e:
-# err = err + "\n" + e.split(":", 1)[1]
-# else:
-# err = err + "\n" + e
-# except:
-# err = err + "\n" + main_err
-# os.remove("%s%s.class" % (student_directory, student_file_name))
-# else:
-# err = "Compilation Error:\n"
-# try:
-# error_lines = stdnt_stderr.splitlines()
-# for e in error_lines:
-# if ':' in e:
-# err = err + "\n" + e.split(":", 1)[1]
-# else:
-# err = err + "\n" + e
-# except:
-# err = err + "\n" + stdnt_stderr
-# result = {'success': success, 'error': err}
-# return result
-
-# def _remove_null_substitute_char(self, string):
-# """Returns a string without any null and substitute characters"""
-# stripped = ""
-# for c in string:
-# if ord(c) is not 26 and ord(c) is not 0:
-# stripped = stripped + c
-# return ''.join(stripped)
-
-# def run_scilab_code(self, answer, test_parameter, in_dir=None):
-# """Tests given Scilab function (`answer`) with the `test_code`
-# supplied. If the optional `in_dir` keyword argument is supplied
-# it changes the directory to that directory (it does not change
-# it back to the original when done). This function also timesout
-# when the function takes more than SERVER_TIMEOUT seconds to run
-# to prevent runaway code.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # Removes all the commands that terminates scilab
-# answer,i = self._remove_scilab_exit(answer.lstrip())
-
-# # Throw message if there are commmands that terminates scilab
-# add_err=""
-# if i > 0:
-# add_err = "Please do not use exit, quit and abort commands in your\
-# code.\n Otherwise your code will not be evaluated\
-# correctly.\n"
-
-# # The file extension should be .sci
-# submit_f = open('function.sci','w')
-# submit_f.write(answer)
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
-# cmd += ' | timeout 8 scilab-cli -nb'
-# ret = self._run_command(cmd,
-# shell=True,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdout, stderr = ret
-
-# # Get only the error.
-# stderr = self._get_error(stdout)
-# if stderr is None:
-# # Clean output
-# stdout = self._strip_output(stdout)
-# if proc.returncode == 5:
-# success, err = True, "Correct answer"
-# else:
-# err = add_err + stdout
-# else:
-# err = add_err + stderr
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def _remove_scilab_exit(self, string):
-# """
-# Removes exit, quit and abort from the scilab code
-# """
-# new_string = ""
-# i=0
-# for line in string.splitlines():
-# new_line = re.sub(r"exit.*$","",line)
-# new_line = re.sub(r"quit.*$","",new_line)
-# new_line = re.sub(r"abort.*$","",new_line)
-# if line != new_line:
-# i=i+1
-# new_string = new_string +'\n'+ new_line
-# return new_string, i
-
- def _get_error(self, string):
- """
- Fetches only the error from the string.
- Returns None if no error.
- """
- obj = re.search("!.+\n.+",string);
- if obj:
- return obj.group()
- return None
-
- def _strip_output(self, out):
- """
- Cleans whitespace from the output
- """
- strip_out = "Message"
- for l in out.split('\n'):
- if l.strip():
- strip_out = strip_out+"\n"+l.strip()
- return strip_out
-
def run(self):
- """Run XMLRPC server, serving our methods.
- """
+ """Run XMLRPC server, serving our methods."""
server = SimpleXMLRPCServer(("localhost", self.port))
self.server = server
server.register_instance(self)