diff options
Diffstat (limited to 'testapp/exam/evaluators')
-rw-r--r-- | testapp/exam/evaluators/__init__.py | 0 | ||||
-rw-r--r-- | testapp/exam/evaluators/bash_code_evaluator.py | 122 | ||||
-rw-r--r-- | testapp/exam/evaluators/code_evaluator.py | 206 | ||||
-rw-r--r-- | testapp/exam/evaluators/cpp_code_evaluator.py | 125 | ||||
-rw-r--r-- | testapp/exam/evaluators/java_code_evaluator.py | 128 | ||||
-rw-r--r-- | testapp/exam/evaluators/language_registry.py | 36 | ||||
-rw-r--r-- | testapp/exam/evaluators/python_code_evaluator.py | 61 | ||||
-rw-r--r-- | testapp/exam/evaluators/scilab_code_evaluator.py | 105 |
8 files changed, 0 insertions, 783 deletions
diff --git a/testapp/exam/evaluators/__init__.py b/testapp/exam/evaluators/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/testapp/exam/evaluators/__init__.py +++ /dev/null diff --git a/testapp/exam/evaluators/bash_code_evaluator.py b/testapp/exam/evaluators/bash_code_evaluator.py deleted file mode 100644 index a468fd7..0000000 --- a/testapp/exam/evaluators/bash_code_evaluator.py +++ /dev/null @@ -1,122 +0,0 @@ -#!/usr/bin/env python -import traceback -import pwd -import os -from os.path import join, isfile -import subprocess -import importlib - -# local imports -from code_evaluator import CodeEvaluator - - -class BashCodeEvaluator(CodeEvaluator): - """Tests the Bash code obtained from Code Server""" - def __init__(self, test_case_data, test, language, user_answer, - ref_code_path=None, in_dir=None): - super(BashCodeEvaluator, self).__init__(test_case_data, test, language, user_answer, - ref_code_path, in_dir) - self.test_case_args = self._setup() - - # Private Protocol ########## - def _setup(self): - super(BashCodeEvaluator, self)._setup() - - self.submit_path = self.create_submit_code_file('submit.sh') - self._set_file_as_executable(self.submit_path) - get_ref_path, get_test_case_path = self.ref_code_path.strip().split(',') - get_ref_path = get_ref_path.strip() - get_test_case_path = get_test_case_path.strip() - ref_path, test_case_path = self._set_test_code_file_path(get_ref_path, - get_test_case_path) - - return ref_path, self.submit_path, test_case_path - - def _teardown(self): - # Delete the created file. - super(BashCodeEvaluator, self)._teardown() - os.remove(self.submit_path) - - def _check_code(self, ref_path, submit_path, - test_case_path=None): - """ Function validates student script using instructor script as - reference. Test cases can optionally be provided. The first argument - ref_path, is the path to instructor script, it is assumed to - have executable permission. The second argument submit_path, is - the path to the student script, it is assumed to have executable - permission. The Third optional argument is the path to test the - scripts. Each line in this file is a test case and each test case is - passed to the script as standard arguments. - - Returns - -------- - - returns (True, "Correct answer") : If the student script passes all - test cases/have same output, when compared to the instructor script - - returns (False, error_msg): If the student script fails a single - test/have dissimilar output, when compared to the instructor script. - - Returns (False, error_msg): If mandatory arguments are not files or if - the required permissions are not given to the file(s). - - """ - if not isfile(ref_path): - return False, "No file at %s or Incorrect path" % ref_path - if not isfile(submit_path): - return False, "No file at %s or Incorrect path" % submit_path - if not os.access(ref_path, os.X_OK): - return False, "Script %s is not executable" % ref_path - if not os.access(submit_path, os.X_OK): - return False, "Script %s is not executable" % submit_path - - success = False - - if test_case_path is None or "": - ret = self._run_command(ref_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - ret = self._run_command(submit_path, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - if inst_stdout == stdnt_stdout: - return True, "Correct answer" - else: - err = "Error: expected %s, got %s" % (inst_stderr, - stdnt_stderr) - return False, err - else: - if not isfile(test_case_path): - return False, "No test case at %s" % test_case_path - if not os.access(ref_path, os.R_OK): - return False, "Test script %s, not readable" % test_case_path - # valid_answer is True, so that we can stop once a test case fails - valid_answer = True - # loop_count has to be greater than or equal to one. - # Useful for caching things like empty test files,etc. - loop_count = 0 - test_cases = open(test_case_path).readlines() - num_lines = len(test_cases) - for test_case in test_cases: - loop_count += 1 - if valid_answer: - args = [ref_path] + [x for x in test_case.split()] - ret = self._run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, inst_stdout, inst_stderr = ret - args = [submit_path]+[x for x in test_case.split()] - ret = self._run_command(args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdnt_stdout, stdnt_stderr = ret - valid_answer = inst_stdout == stdnt_stdout - if valid_answer and (num_lines == loop_count): - return True, "Correct answer" - else: - err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr, - stdnt_stdout+stdnt_stderr) - return False, err - diff --git a/testapp/exam/evaluators/code_evaluator.py b/testapp/exam/evaluators/code_evaluator.py deleted file mode 100644 index 381b2e8..0000000 --- a/testapp/exam/evaluators/code_evaluator.py +++ /dev/null @@ -1,206 +0,0 @@ -import sys -from SimpleXMLRPCServer import SimpleXMLRPCServer -import pwd -import os -import stat -from os.path import isdir, dirname, abspath, join, isfile -import signal -from multiprocessing import Process, Queue -import subprocess -import re -import json -# Local imports. -from settings import SERVER_TIMEOUT - - -MY_DIR = abspath(dirname(__file__)) - - -# Raised when the code times-out. -# c.f. http://pguides.net/python/timeout-a-function -class TimeoutException(Exception): - pass - - -def timeout_handler(signum, frame): - """A handler for the ALARM signal.""" - raise TimeoutException('Code took too long to run.') - - -def create_signal_handler(): - """Add a new signal handler for the execution of this code.""" - prev_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(SERVER_TIMEOUT) - return prev_handler - - -def set_original_signal_handler(old_handler=None): - """Set back any original signal handler.""" - if old_handler is not None: - signal.signal(signal.SIGALRM, old_handler) - return - else: - raise Exception("Signal Handler: object cannot be NoneType") - - -def delete_signal_handler(): - signal.alarm(0) - return - - -class CodeEvaluator(object): - """Tests the code obtained from Code Server""" - def __init__(self, test_case_data, test, language, user_answer, - ref_code_path=None, in_dir=None): - msg = 'Code took more than %s seconds to run. You probably '\ - 'have an infinite loop in your code.' % SERVER_TIMEOUT - self.timeout_msg = msg - self.test_case_data = test_case_data - self.language = language.lower() - self.user_answer = user_answer - self.ref_code_path = ref_code_path - self.test = test - self.in_dir = in_dir - self.test_case_args = None - - # Public Protocol ########## - @classmethod - def from_json(cls, language, json_data, in_dir): - json_data = json.loads(json_data) - test_case_data = json_data.get("test_case_data") - user_answer = json_data.get("user_answer") - ref_code_path = json_data.get("ref_code_path") - test = json_data.get("test") - - instance = cls(test_case_data, test, language, user_answer, ref_code_path, - in_dir) - return instance - - def evaluate(self): - """Evaluates given code with the test cases based on - given arguments in test_case_data. - - The ref_code_path is a path to the reference code. - The reference code will call the function submitted by the student. - The reference code will check for the expected output. - - If the path's start with a "/" then we assume they are absolute paths. - If not, we assume they are relative paths w.r.t. the location of this - code_server script. - - If the optional `in_dir` keyword argument is supplied it changes the - directory to that directory (it does not change it back to the original - when done). - - Returns - ------- - - A tuple: (success, error message). - """ - - self._setup() - success, err = self._evaluate(self.test_case_args) - self._teardown() - - result = {'success': success, 'error': err} - return result - - # Private Protocol ########## - def _setup(self): - self._change_dir(self.in_dir) - - def _evaluate(self, args): - # Add a new signal handler for the execution of this code. - prev_handler = create_signal_handler() - success = False - args = args or [] - - # Do whatever testing needed. - try: - success, err = self._check_code(*args) - - except TimeoutException: - err = self.timeout_msg - except: - _type, value = sys.exc_info()[:2] - err = "Error: {0}".format(repr(value)) - finally: - # Set back any original signal handler. - set_original_signal_handler(prev_handler) - - return success, err - - def _teardown(self): - # Cancel the signal - delete_signal_handler() - - def _check_code(self): - raise NotImplementedError("check_code method not implemented") - - def create_submit_code_file(self, file_name): - """ Write the code (`answer`) to a file and set the file path""" - submit_f = open(file_name, 'w') - submit_f.write(self.user_answer.lstrip()) - submit_f.close() - submit_path = abspath(submit_f.name) - - return submit_path - - def _set_file_as_executable(self, fname): - os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR - | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP - | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) - - def _set_test_code_file_path(self, ref_path=None, test_case_path=None): - if ref_path and not ref_path.startswith('/'): - ref_path = join(MY_DIR, ref_path) - - if test_case_path and not test_case_path.startswith('/'): - test_case_path = join(MY_DIR, test_case_path) - - return ref_path, test_case_path - - def _run_command(self, cmd_args, *args, **kw): - """Run a command in a subprocess while blocking, the process is killed - if it takes more than 2 seconds to run. Return the Popen object, the - stdout and stderr. - """ - try: - proc = subprocess.Popen(cmd_args, *args, **kw) - stdout, stderr = proc.communicate() - except TimeoutException: - # Runaway code, so kill it. - proc.kill() - # Re-raise exception. - raise - return proc, stdout, stderr - - def _compile_command(self, cmd, *args, **kw): - """Compiles C/C++/java code and returns errors if any. - Run a command in a subprocess while blocking, the process is killed - if it takes more than 2 seconds to run. Return the Popen object, the - stderr. - """ - try: - proc_compile = subprocess.Popen(cmd, shell=True, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = proc_compile.communicate() - except TimeoutException: - # Runaway code, so kill it. - proc_compile.kill() - # Re-raise exception. - raise - return proc_compile, err - - def _change_dir(self, in_dir): - if in_dir is not None and isdir(in_dir): - os.chdir(in_dir) - - def _remove_null_substitute_char(self, string): - """Returns a string without any null and substitute characters""" - stripped = "" - for c in string: - if ord(c) is not 26 and ord(c) is not 0: - stripped = stripped + c - return ''.join(stripped) diff --git a/testapp/exam/evaluators/cpp_code_evaluator.py b/testapp/exam/evaluators/cpp_code_evaluator.py deleted file mode 100644 index 7242884..0000000 --- a/testapp/exam/evaluators/cpp_code_evaluator.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python -import traceback -import pwd -import os -from os.path import join, isfile -import subprocess -import importlib - -# local imports -from code_evaluator import CodeEvaluator - - -class CppCodeEvaluator(CodeEvaluator): - """Tests the C code obtained from Code Server""" - def __init__(self, test_case_data, test, language, user_answer, - ref_code_path=None, in_dir=None): - super(CppCodeEvaluator, self).__init__(test_case_data, test, language, - user_answer, ref_code_path, - in_dir) - self.test_case_args = self._setup() - - # Private Protocol ########## - def _setup(self): - super(CppCodeEvaluator, self)._setup() - - get_ref_path = self.ref_code_path - ref_path, test_case_path = self._set_test_code_file_path(get_ref_path) - self.submit_path = self.create_submit_code_file('submit.c') - - # Set file paths - c_user_output_path = os.getcwd() + '/output' - c_ref_output_path = os.getcwd() + '/executable' - - # Set command variables - compile_command = 'g++ {0} -c -o {1}'.format(self.submit_path, - c_user_output_path) - compile_main = 'g++ {0} {1} -o {2}'.format(ref_path, - c_user_output_path, - c_ref_output_path) - run_command_args = [c_ref_output_path] - remove_user_output = c_user_output_path - remove_ref_output = c_ref_output_path - - return (ref_path, self.submit_path, compile_command, compile_main, - run_command_args, remove_user_output, remove_ref_output) - - def _teardown(self): - # Delete the created file. - super(CppCodeEvaluator, self)._teardown() - os.remove(self.submit_path) - - def _check_code(self, ref_code_path, submit_code_path, compile_command, - compile_main, run_command_args, remove_user_output, - remove_ref_output): - """ Function validates student code using instructor code as - reference.The first argument ref_code_path, is the path to - instructor code, it is assumed to have executable permission. - The second argument submit_code_path, is the path to the student - code, it is assumed to have executable permission. - - Returns - -------- - - returns (True, "Correct answer") : If the student function returns - expected output when called by reference code. - - returns (False, error_msg): If the student function fails to return - expected output when called by reference code. - - Returns (False, error_msg): If mandatory arguments are not files or - if the required permissions are not given to the file(s). - - """ - if not isfile(ref_code_path): - return False, "No file at %s or Incorrect path" % ref_code_path - if not isfile(submit_code_path): - return False, 'No file at %s or Incorrect path' % submit_code_path - - success = False - ret = self._compile_command(compile_command) - proc, stdnt_stderr = ret - stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr) - - # Only if compilation is successful, the program is executed - # And tested with testcases - if stdnt_stderr == '': - ret = self._compile_command(compile_main) - proc, main_err = ret - main_err = self._remove_null_substitute_char(main_err) - - if main_err == '': - ret = self._run_command(run_command_args, stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - if proc.returncode == 0: - success, err = True, "Correct answer" - else: - err = stdout + "\n" + stderr - os.remove(remove_ref_output) - else: - err = "Error:" - try: - error_lines = main_err.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + main_err - os.remove(remove_user_output) - else: - err = "Compilation Error:" - try: - error_lines = stdnt_stderr.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + stdnt_stderr - - return success, err diff --git a/testapp/exam/evaluators/java_code_evaluator.py b/testapp/exam/evaluators/java_code_evaluator.py deleted file mode 100644 index 4367259..0000000 --- a/testapp/exam/evaluators/java_code_evaluator.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/env python -import traceback -import pwd -import os -from os.path import join, isfile -import subprocess -import importlib - -# local imports -from code_evaluator import CodeEvaluator - - -class JavaCodeEvaluator(CodeEvaluator): - """Tests the Java code obtained from Code Server""" - def __init__(self, test_case_data, test, language, user_answer, - ref_code_path=None, in_dir=None): - super(JavaCodeEvaluator, self).__init__(test_case_data, test, - language, user_answer, - ref_code_path, in_dir) - self.test_case_args = self._setup() - - # Private Protocol ########## - def _setup(self): - super(JavaCodeEvaluator, self)._setup() - - ref_path, test_case_path = self._set_test_code_file_path(self.ref_code_path) - self.submit_path = self.create_submit_code_file('Test.java') - - # Set file paths - java_student_directory = os.getcwd() + '/' - java_ref_file_name = (ref_path.split('/')[-1]).split('.')[0] - - # Set command variables - compile_command = 'javac {0}'.format(self.submit_path), - compile_main = ('javac {0} -classpath ' - '{1} -d {2}').format(ref_path, - java_student_directory, - java_student_directory) - run_command_args = "java -cp {0} {1}".format(java_student_directory, - java_ref_file_name) - remove_user_output = "{0}{1}.class".format(java_student_directory, - 'Test') - remove_ref_output = "{0}{1}.class".format(java_student_directory, - java_ref_file_name) - - return (ref_path, self.submit_path, compile_command, compile_main, - run_command_args, remove_user_output, remove_ref_output) - - def _teardown(self): - # Delete the created file. - super(JavaCodeEvaluator, self)._teardown() - os.remove(self.submit_path) - - def _check_code(self, ref_code_path, submit_code_path, compile_command, - compile_main, run_command_args, remove_user_output, - remove_ref_output): - """ Function validates student code using instructor code as - reference.The first argument ref_code_path, is the path to - instructor code, it is assumed to have executable permission. - The second argument submit_code_path, is the path to the student - code, it is assumed to have executable permission. - - Returns - -------- - - returns (True, "Correct answer") : If the student function returns - expected output when called by reference code. - - returns (False, error_msg): If the student function fails to return - expected output when called by reference code. - - Returns (False, error_msg): If mandatory arguments are not files or - if the required permissions are not given to the file(s). - - """ - if not isfile(ref_code_path): - return False, "No file at %s or Incorrect path" % ref_code_path - if not isfile(submit_code_path): - return False, 'No file at %s or Incorrect path' % submit_code_path - - success = False - ret = self._compile_command(compile_command) - proc, stdnt_stderr = ret - stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr) - - # Only if compilation is successful, the program is executed - # And tested with testcases - if stdnt_stderr == '': - ret = self._compile_command(compile_main) - proc, main_err = ret - main_err = self._remove_null_substitute_char(main_err) - - if main_err == '': - ret = self._run_command(run_command_args, shell=True, - stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - if proc.returncode == 0: - success, err = True, "Correct answer" - else: - err = stdout + "\n" + stderr - os.remove(remove_ref_output) - else: - err = "Error:" - try: - error_lines = main_err.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + main_err - os.remove(remove_user_output) - else: - err = "Compilation Error:" - try: - error_lines = stdnt_stderr.splitlines() - for e in error_lines: - if ':' in e: - err = err + "\n" + e.split(":", 1)[1] - else: - err = err + "\n" + e - except: - err = err + "\n" + stdnt_stderr - - return success, err diff --git a/testapp/exam/evaluators/language_registry.py b/testapp/exam/evaluators/language_registry.py deleted file mode 100644 index 76a23d7..0000000 --- a/testapp/exam/evaluators/language_registry.py +++ /dev/null @@ -1,36 +0,0 @@ -from settings import code_evaluators -import importlib - -registry = None - -def set_registry(): - global registry - registry = _LanguageRegistry() - -def get_registry(): - return registry - -class _LanguageRegistry(object): - def __init__(self): - self._register = {} - for language, module in code_evaluators.iteritems(): - self._register[language] = None - - # Public Protocol ########## - def get_class(self, language): - """ Get the code evaluator class for the given language """ - if not self._register.get(language): - self._register[language] = code_evaluators.get(language) - - cls = self._register[language] - module_name, class_name = cls.rsplit(".", 1) - # load the module, will raise ImportError if module cannot be loaded - get_module = importlib.import_module(module_name) - # get the class, will raise AttributeError if class cannot be found - get_class = getattr(get_module, class_name) - return get_class - - def register(self, language, class_name): - """ Register a new code evaluator class for language""" - self._register[language] = class_name - diff --git a/testapp/exam/evaluators/python_code_evaluator.py b/testapp/exam/evaluators/python_code_evaluator.py deleted file mode 100644 index 0c473cf..0000000 --- a/testapp/exam/evaluators/python_code_evaluator.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python -import sys -import traceback -import os -from os.path import join -import importlib - -# local imports -from code_evaluator import CodeEvaluator - - -class PythonCodeEvaluator(CodeEvaluator): - """Tests the Python code obtained from Code Server""" - # Private Protocol ########## - def _check_code(self): - success = False - - try: - tb = None - test_code = self._create_test_case() - submitted = compile(self.user_answer, '<string>', mode='exec') - g = {} - exec submitted in g - _tests = compile(test_code, '<string>', mode='exec') - exec _tests in g - except AssertionError: - type, value, tb = sys.exc_info() - info = traceback.extract_tb(tb) - fname, lineno, func, text = info[-1] - text = str(test_code).splitlines()[lineno-1] - err = "{0} {1} in: {2}".format(type.__name__, str(value), text) - else: - success = True - err = 'Correct answer' - - del tb - return success, err - - def _create_test_case(self): - """ - Create assert based test cases in python - """ - test_code = "" - if self.test: - return self.test - elif self.test_case_data: - for test_case in self.test_case_data: - pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) \ - if test_case.get('pos_args') else "" - kw_args = ", ".join(str(k+"="+a) for k, a - in test_case.get('kw_args').iteritems()) \ - if test_case.get('kw_args') else "" - args = pos_args + ", " + kw_args if pos_args and kw_args \ - else pos_args or kw_args - function_name = test_case.get('func_name') - expected_answer = test_case.get('expected_answer') - - tcode = "assert {0}({1}) == {2}".format(function_name, args, - expected_answer) - test_code += tcode + "\n" - return test_code diff --git a/testapp/exam/evaluators/scilab_code_evaluator.py b/testapp/exam/evaluators/scilab_code_evaluator.py deleted file mode 100644 index 392cd45..0000000 --- a/testapp/exam/evaluators/scilab_code_evaluator.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env python -import traceback -import os -from os.path import join, isfile -import subprocess -import re -import importlib - -# local imports -from code_evaluator import CodeEvaluator - - -class ScilabCodeEvaluator(CodeEvaluator): - """Tests the Scilab code obtained from Code Server""" - def __init__(self, test_case_data, test, language, user_answer, - ref_code_path=None, in_dir=None): - super(ScilabCodeEvaluator, self).__init__(test_case_data, test, - language, user_answer, - ref_code_path, in_dir) - - # Removes all the commands that terminates scilab - self.user_answer, self.terminate_commands = self._remove_scilab_exit(user_answer.lstrip()) - self.test_case_args = self._setup() - - # Private Protocol ########## - def _setup(self): - super(ScilabCodeEvaluator, self)._setup() - - ref_path, test_case_path = self._set_test_code_file_path(self.ref_code_path) - self.submit_path = self.create_submit_code_file('function.sci') - - return ref_path, # Return as a tuple - - def _teardown(self): - # Delete the created file. - super(ScilabCodeEvaluator, self)._teardown() - os.remove(self.submit_path) - - def _check_code(self, ref_path): - success = False - - # Throw message if there are commmands that terminates scilab - add_err="" - if self.terminate_commands: - add_err = "Please do not use exit, quit and abort commands in your\ - code.\n Otherwise your code will not be evaluated\ - correctly.\n" - - cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path) - cmd += ' | timeout 8 scilab-cli -nb' - ret = self._run_command(cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc, stdout, stderr = ret - - # Get only the error. - stderr = self._get_error(stdout) - if stderr is None: - # Clean output - stdout = self._strip_output(stdout) - if proc.returncode == 5: - success, err = True, "Correct answer" - else: - err = add_err + stdout - else: - err = add_err + stderr - - return success, err - - def _remove_scilab_exit(self, string): - """ - Removes exit, quit and abort from the scilab code - """ - new_string = "" - terminate_commands = False - for line in string.splitlines(): - new_line = re.sub(r"exit.*$", "", line) - new_line = re.sub(r"quit.*$", "", new_line) - new_line = re.sub(r"abort.*$", "", new_line) - if line != new_line: - terminate_commands = True - new_string = new_string + '\n' + new_line - return new_string, terminate_commands - - def _get_error(self, string): - """ - Fetches only the error from the string. - Returns None if no error. - """ - obj = re.search("!.+\n.+", string) - if obj: - return obj.group() - return None - - def _strip_output(self, out): - """ - Cleans whitespace from the output - """ - strip_out = "Message" - for l in out.split('\n'): - if l.strip(): - strip_out = strip_out+"\n"+l.strip() - return strip_out - |