summaryrefslogtreecommitdiff
path: root/yaksh
diff options
context:
space:
mode:
authorankitjavalkar2016-12-07 14:59:07 +0530
committerankitjavalkar2016-12-20 12:46:02 +0530
commitdee13fa4f8006d5266c02d6290b0e98d31413a9f (patch)
treed34bfed626a5e23689047318eda966c2495a2cfa /yaksh
parent1400eeb1d5af1cd1d69e015a19a319ab35d357c4 (diff)
downloadonline_test-dee13fa4f8006d5266c02d6290b0e98d31413a9f.tar.gz
online_test-dee13fa4f8006d5266c02d6290b0e98d31413a9f.tar.bz2
online_test-dee13fa4f8006d5266c02d6290b0e98d31413a9f.zip
Refactor code server and python evaluator
Diffstat (limited to 'yaksh')
-rw-r--r--yaksh/code_evaluator.py79
-rw-r--r--[-rwxr-xr-x]yaksh/code_server.py21
-rw-r--r--yaksh/evaluator_tests/old_test_python_evaluation.py549
-rw-r--r--yaksh/evaluator_tests/test_python_evaluation.py687
-rw-r--r--yaksh/language_registry.py15
-rwxr-xr-xyaksh/old_code_server.py221
-rw-r--r--yaksh/python_assertion_evaluator.py63
-rw-r--r--yaksh/settings.py36
8 files changed, 1491 insertions, 180 deletions
diff --git a/yaksh/code_evaluator.py b/yaksh/code_evaluator.py
index afe18c3..5ede63d 100644
--- a/yaksh/code_evaluator.py
+++ b/yaksh/code_evaluator.py
@@ -19,9 +19,12 @@ except ImportError:
# Local imports
from .settings import SERVER_TIMEOUT
+from .language_registry import create_evaluator_instance
+
MY_DIR = abspath(dirname(__file__))
+registry = None
# Raised when the code times-out.
# c.f. http://pguides.net/python/timeout-a-function
@@ -63,7 +66,8 @@ class CodeEvaluator(object):
self.timeout_msg = msg
self.in_dir = in_dir
- def evaluate(self, **kwargs):
+
+ def evaluate(self, kwargs): #language, test_case_type,
"""Evaluates given code with the test cases based on
given arguments in test_case_data.
@@ -85,6 +89,9 @@ class CodeEvaluator(object):
A tuple: (success, error message, weight).
"""
+ # self.language = language
+ # self.test_case_type = test_case_type
+
self.setup()
success, error, weight = self.safe_evaluate(**kwargs)
self.teardown()
@@ -99,11 +106,13 @@ class CodeEvaluator(object):
os.makedirs(self.in_dir)
self._change_dir(self.in_dir)
- def safe_evaluate(self, user_answer, partial_grading, test_case_data, file_paths=None):
+ def safe_evaluate(self, **kwargs): #user_answer, partial_grading, test_case_data, file_paths=None
"""
Handles code evaluation along with compilation, signal handling
and Exception handling
"""
+ metadata = kwargs.get('metadata') # metadata contains user_answer, language, partial_grading, file_paths
+ test_case_data = kwargs.get('test_case_data')
# Add a new signal handler for the execution of this code.
prev_handler = create_signal_handler()
@@ -114,14 +123,16 @@ class CodeEvaluator(object):
# Do whatever testing needed.
try:
+ # Run evaluator selection registry here
for idx, test_case in enumerate(test_case_data):
+ test_case_instance = create_evaluator_instance(metadata, test_case) #language, test_case
test_case_success = False
- self.compile_code(user_answer, file_paths, **test_case)
- test_case_success, err, test_case_weight = self.check_code(user_answer,
- file_paths,
- partial_grading,
- **test_case
- )
+ test_case_instance.compile_code() #user_answer, file_paths, test_case
+ test_case_success, err, test_case_weight = test_case_instance.check_code() #**kwargs
+ # user_answer,
+ # file_paths,
+ # partial_grading,
+ # **test_case
if test_case_success:
weight += test_case_weight
@@ -135,7 +146,7 @@ class CodeEvaluator(object):
except OSError:
msg = traceback.format_exc(limit=0)
error = "Error: {0}".format(msg)
- except Exception:
+ except Exception as e:
exc_type, exc_value, exc_tb = sys.exc_info()
tb_list = traceback.format_exception(exc_type, exc_value, exc_tb)
if len(tb_list) > 2:
@@ -147,6 +158,56 @@ class CodeEvaluator(object):
return success, error, weight
+ # def safe_evaluate(self, user_answer, partial_grading, test_case_data, file_paths=None):
+ # """
+ # Handles code evaluation along with compilation, signal handling
+ # and Exception handling
+ # """
+
+ # # Add a new signal handler for the execution of this code.
+ # prev_handler = create_signal_handler()
+ # success = False
+ # test_case_success_status = [False] * len(test_case_data)
+ # error = ""
+ # weight = 0.0
+
+ # # Do whatever testing needed.
+ # try:
+ # for idx, test_case in enumerate(test_case_data):
+ # test_case_success = False
+ # self.compile_code(user_answer, file_paths, **test_case)
+ # test_case_success, err, test_case_weight = self.check_code(user_answer,
+ # file_paths,
+ # partial_grading,
+ # **test_case
+ # )
+ # if test_case_success:
+ # weight += test_case_weight
+
+ # error += err + "\n"
+ # test_case_success_status[idx] = test_case_success
+
+ # success = all(test_case_success_status)
+
+ # except TimeoutException:
+ # error = self.timeout_msg
+ # except OSError:
+ # msg = traceback.format_exc(limit=0)
+ # error = "Error: {0}".format(msg)
+ # except Exception as e:
+ # print "HELLOOOOO", e
+ # exc_type, exc_value, exc_tb = sys.exc_info()
+ # tb_list = traceback.format_exception(exc_type, exc_value, exc_tb)
+ # if len(tb_list) > 2:
+ # del tb_list[1:3]
+ # error = "Error: {0}".format("".join(tb_list))
+ # finally:
+ # # Set back any original signal handler.
+ # set_original_signal_handler(prev_handler)
+
+ # return success, error, weight
+
+
def teardown(self):
# Cancel the signal
delete_signal_handler()
diff --git a/yaksh/code_server.py b/yaksh/code_server.py
index b3c9c30..abe7cd8 100755..100644
--- a/yaksh/code_server.py
+++ b/yaksh/code_server.py
@@ -53,7 +53,7 @@ from tornado.web import Application, RequestHandler
# Local imports
from .settings import SERVER_PORTS, SERVER_POOL_PORT
-from .language_registry import create_evaluator_instance, unpack_json
+from .language_registry import create_evaluator_instance
MY_DIR = abspath(dirname(__file__))
@@ -84,19 +84,24 @@ class CodeServer(object):
"""Calls relevant EvaluateCode class based on language to check the
answer code
"""
- code_evaluator = create_evaluator_instance(language,
- test_case_type,
- json_data,
- in_dir
- )
- data = unpack_json(json_data)
- result = code_evaluator.evaluate(**data)
+ # code_evaluator = create_evaluator_instance(language,
+ # test_case_type,
+ # json_data,
+ # in_dir
+ # )
+ data = unpack_json_to_python_obj(json_data)
+ code_eval_instance = CodeEvaluator(in_dir)
+ result = code_eval_instance.evaluate(**data) #language, test_case_type,
# Put us back into the server pool queue since we are free now.
self.queue.put(self.port)
return json.dumps(result)
+ def unpack_json_to_python_obj(self, json_data):
+ data = json.loads(json_data)
+ return data
+
def run(self):
"""Run XMLRPC server, serving our methods."""
server = SimpleXMLRPCServer(("0.0.0.0", self.port))
diff --git a/yaksh/evaluator_tests/old_test_python_evaluation.py b/yaksh/evaluator_tests/old_test_python_evaluation.py
new file mode 100644
index 0000000..9796fa2
--- /dev/null
+++ b/yaksh/evaluator_tests/old_test_python_evaluation.py
@@ -0,0 +1,549 @@
+from __future__ import unicode_literals
+import unittest
+import os
+import tempfile
+import shutil
+from textwrap import dedent
+
+# Local import
+from yaksh.python_assertion_evaluator import PythonAssertionEvaluator
+from yaksh.python_stdio_evaluator import PythonStdioEvaluator
+from yaksh.settings import SERVER_TIMEOUT
+
+
+class PythonAssertionEvaluationTestCases(unittest.TestCase):
+ def setUp(self):
+ with open('/tmp/test.txt', 'wb') as f:
+ f.write('2'.encode('ascii'))
+ tmp_in_dir_path = tempfile.mkdtemp()
+ self.in_dir = tmp_in_dir_path
+ self.test_case_data = [{"test_case": 'assert(add(1,2)==3)', 'weight': 0.0},
+ {"test_case": 'assert(add(-1,2)==1)', 'weight': 0.0},
+ {"test_case": 'assert(add(-1,-2)==-3)', 'weight': 0.0},
+ ]
+ self.timeout_msg = ("Code took more than {0} seconds to run. "
+ "You probably have an infinite loop in"
+ " your code.").format(SERVER_TIMEOUT)
+ self.file_paths = None
+
+ def tearDown(self):
+ os.remove('/tmp/test.txt')
+ shutil.rmtree(self.in_dir)
+
+ def test_correct_answer(self):
+ # Given
+ user_answer = "def add(a,b):\n\treturn a + b"
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertTrue(result.get('success'))
+ self.assertIn("Correct answer", result.get('error'))
+
+ def test_incorrect_answer(self):
+ # Given
+ user_answer = "def add(a,b):\n\treturn a - b"
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertFalse(result.get('success'))
+ self.assertIn('AssertionError in: assert(add(1,2)==3)',
+ result.get('error')
+ )
+ self.assertIn('AssertionError in: assert(add(-1,2)==1)',
+ result.get('error')
+ )
+ self.assertIn('AssertionError in: assert(add(-1,-2)==-3)',
+ result.get('error')
+ )
+
+ def test_partial_incorrect_answer(self):
+ # Given
+ user_answer = "def add(a,b):\n\treturn abs(a) + abs(b)"
+ test_case_data = [{"test_case": 'assert(add(-1,2)==1)', 'weight': 1.0},
+ {"test_case": 'assert(add(-1,-2)==-3)', 'weight': 1.0},
+ {"test_case": 'assert(add(1,2)==3)', 'weight': 2.0}
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': True
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertFalse(result.get('success'))
+ self.assertEqual(result.get('weight'), 2.0)
+ self.assertIn('AssertionError in: assert(add(-1,2)==1)',
+ result.get('error')
+ )
+ self.assertIn('AssertionError in: assert(add(-1,-2)==-3)',
+ result.get('error')
+ )
+
+ def test_infinite_loop(self):
+ # Given
+ user_answer = "def add(a, b):\n\twhile True:\n\t\tpass"
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertFalse(result.get('success'))
+ self.assertEqual(result.get('error'), self.timeout_msg)
+
+ def test_syntax_error(self):
+ # Given
+ user_answer = dedent("""
+ def add(a, b);
+ return a + b
+ """)
+ syntax_error_msg = ["Traceback",
+ "call",
+ "File",
+ "line",
+ "<string>",
+ "SyntaxError",
+ "invalid syntax"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ self.assertEqual(5, len(err))
+ for msg in syntax_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+ def test_indent_error(self):
+ # Given
+ user_answer = dedent("""
+ def add(a, b):
+ return a + b
+ """)
+ indent_error_msg = ["Traceback", "call",
+ "File",
+ "line",
+ "<string>",
+ "IndentationError",
+ "indented block"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ self.assertEqual(5, len(err))
+ for msg in indent_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+ def test_name_error(self):
+ # Given
+ user_answer = ""
+ name_error_msg = ["Traceback",
+ "call",
+ "NameError",
+ "name",
+ "defined"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ self.assertEqual(3, len(err))
+ for msg in name_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+ def test_recursion_error(self):
+ # Given
+ user_answer = dedent("""
+ def add(a, b):
+ return add(3, 3)
+ """)
+ recursion_error_msg = ["Traceback",
+ "call",
+ "maximum recursion depth exceeded"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ for msg in recursion_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+ def test_type_error(self):
+ # Given
+ user_answer = dedent("""
+ def add(a):
+ return a + b
+ """)
+ type_error_msg = ["Traceback",
+ "call",
+ "TypeError",
+ "argument"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ self.assertEqual(3, len(err))
+ for msg in type_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+ def test_value_error(self):
+ # Given
+ user_answer = dedent("""
+ def add(a, b):
+ c = 'a'
+ return int(a) + int(b) + int(c)
+ """)
+ value_error_msg = ["Traceback",
+ "call",
+ "ValueError",
+ "invalid literal",
+ "base"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ self.assertEqual(4, len(err))
+ for msg in value_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+ def test_file_based_assert(self):
+ # Given
+ self.test_case_data = [{"test_case": "assert(ans()=='2')", "weight": 0.0}]
+ self.file_paths = [('/tmp/test.txt', False)]
+ user_answer = dedent("""
+ def ans():
+ with open("test.txt") as f:
+ return f.read()[0]
+ """)
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertIn("Correct answer", result.get('error'))
+ self.assertTrue(result.get('success'))
+
+ def test_single_testcase_error(self):
+ # Given
+ """ Tests the user answer with just an incorrect test case """
+
+ user_answer = "def palindrome(a):\n\treturn a == a[::-1]"
+ test_case_data = [{"test_case": 's="abbb"\nasert palindrome(s)==False',
+ "weight": 0.0
+ }
+ ]
+ syntax_error_msg = ["Traceback",
+ "call",
+ "File",
+ "line",
+ "<string>",
+ "SyntaxError",
+ "invalid syntax"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ self.assertEqual(5, len(err))
+ for msg in syntax_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+
+ def test_multiple_testcase_error(self):
+ """ Tests the user answer with an correct test case
+ first and then with an incorrect test case """
+ # Given
+ user_answer = "def palindrome(a):\n\treturn a == a[::-1]"
+ test_case_data = [{"test_case": 'assert(palindrome("abba")==True)',
+ "weight": 0.0
+ },
+ {"test_case": 's="abbb"\nassert palindrome(S)==False',
+ "weight": 0.0
+ }
+ ]
+ name_error_msg = ["Traceback",
+ "call",
+ "File",
+ "line",
+ "<string>",
+ "NameError",
+ "name 'S' is not defined"
+ ]
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonAssertionEvaluator()
+ result = evaluator.evaluate(**kwargs)
+ err = result.get("error").splitlines()
+
+ # Then
+ self.assertFalse(result.get("success"))
+ self.assertEqual(3, len(err))
+ for msg in name_error_msg:
+ self.assertIn(msg, result.get("error"))
+
+
+class PythonStdIOEvaluationTestCases(unittest.TestCase):
+ def setUp(self):
+ with open('/tmp/test.txt', 'wb') as f:
+ f.write('2'.encode('ascii'))
+ self.file_paths = None
+
+ def test_correct_answer_integer(self):
+ # Given
+ self.test_case_data = [{"expected_input": "1\n2",
+ "expected_output": "3",
+ "weight": 0.0
+ }]
+ user_answer = dedent("""
+ a = int(input())
+ b = int(input())
+ print(a+b)
+ """
+ )
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonStdioEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertTrue(result.get('success'))
+ self.assertIn("Correct answer", result.get('error'))
+
+ def test_correct_answer_list(self):
+ # Given
+ self.test_case_data = [{"expected_input": "1,2,3\n5,6,7",
+ "expected_output": "[1, 2, 3, 5, 6, 7]",
+ "weight": 0.0
+ }]
+ user_answer = dedent("""
+ from six.moves import input
+ input_a = input()
+ input_b = input()
+ a = [int(i) for i in input_a.split(',')]
+ b = [int(i) for i in input_b.split(',')]
+ print(a+b)
+ """
+ )
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonStdioEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertTrue(result.get('success'))
+ self.assertIn("Correct answer", result.get('error'))
+
+ def test_correct_answer_string(self):
+ # Given
+ self.test_case_data = [{"expected_input": ("the quick brown fox jumps over the lazy dog\nthe"),
+ "expected_output": "2",
+ "weight": 0.0
+ }]
+ user_answer = dedent("""
+ from six.moves import input
+ a = str(input())
+ b = str(input())
+ print(a.count(b))
+ """
+ )
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonStdioEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertTrue(result.get('success'))
+ self.assertIn("Correct answer", result.get('error'))
+
+ def test_incorrect_answer_integer(self):
+ # Given
+ self.test_case_data = [{"expected_input": "1\n2",
+ "expected_output": "3",
+ "weight": 0.0
+ }]
+ user_answer = dedent("""
+ a = int(input())
+ b = int(input())
+ print(a-b)
+ """
+ )
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonStdioEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertFalse(result.get('success'))
+ self.assertIn("Incorrect answer", result.get('error'))
+
+ def test_file_based_answer(self):
+ # Given
+ self.test_case_data = [{"expected_input": "",
+ "expected_output": "2",
+ "weight": 0.0
+ }]
+ self.file_paths = [('/tmp/test.txt', False)]
+
+ user_answer = dedent("""
+ with open("test.txt") as f:
+ a = f.read()
+ print(a[0])
+ """
+ )
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': self.test_case_data,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonStdioEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertEqual(result.get('error'), "Correct answer\n")
+ self.assertTrue(result.get('success'))
+
+ def test_infinite_loop(self):
+ # Given
+ test_case_data = [{"expected_input": "1\n2",
+ "expected_output": "3",
+ "weight": 0.0
+ }]
+ timeout_msg = ("Code took more than {0} seconds to run. "
+ "You probably have an infinite loop in"
+ " your code.").format(SERVER_TIMEOUT)
+ user_answer = "while True:\n\tpass"
+ kwargs = {'user_answer': user_answer,
+ 'test_case_data': test_case_data,
+ 'partial_grading': False
+ }
+
+ # When
+ evaluator = PythonStdioEvaluator()
+ result = evaluator.evaluate(**kwargs)
+
+ # Then
+ self.assertEqual(result.get('error'), timeout_msg)
+ self.assertFalse(result.get('success'))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/yaksh/evaluator_tests/test_python_evaluation.py b/yaksh/evaluator_tests/test_python_evaluation.py
index acf5d0a..43cd0ea 100644
--- a/yaksh/evaluator_tests/test_python_evaluation.py
+++ b/yaksh/evaluator_tests/test_python_evaluation.py
@@ -6,25 +6,27 @@ import shutil
from textwrap import dedent
# Local import
+from yaksh.code_evaluator import CodeEvaluator
from yaksh.python_assertion_evaluator import PythonAssertionEvaluator
from yaksh.python_stdio_evaluator import PythonStdioEvaluator
from yaksh.settings import SERVER_TIMEOUT
-
class PythonAssertionEvaluationTestCases(unittest.TestCase):
def setUp(self):
with open('/tmp/test.txt', 'wb') as f:
f.write('2'.encode('ascii'))
tmp_in_dir_path = tempfile.mkdtemp()
self.in_dir = tmp_in_dir_path
- self.test_case_data = [{"test_case": 'assert(add(1,2)==3)', 'weight': 0.0},
- {"test_case": 'assert(add(-1,2)==1)', 'weight': 0.0},
- {"test_case": 'assert(add(-1,-2)==-3)', 'weight': 0.0},
+ self.test_case_data = [{"test_case_type": "standardtestcase", "test_case": 'assert(add(1,2)==3)', 'weight': 0.0},
+ {"test_case_type": "standardtestcase", "test_case": 'assert(add(-1,2)==1)', 'weight': 0.0},
+ {"test_case_type": "standardtestcase", "test_case": 'assert(add(-1,-2)==-3)', 'weight': 0.0},
]
self.timeout_msg = ("Code took more than {0} seconds to run. "
"You probably have an infinite loop in"
" your code.").format(SERVER_TIMEOUT)
self.file_paths = None
+ self.language = 'python'
+ self.test_case_type = 'standardtestcase'
def tearDown(self):
os.remove('/tmp/test.txt')
@@ -33,15 +35,24 @@ class PythonAssertionEvaluationTestCases(unittest.TestCase):
def test_correct_answer(self):
# Given
user_answer = "def add(a,b):\n\treturn a + b"
- kwargs = {'user_answer': user_answer,
- 'test_case_data': self.test_case_data,
- 'file_paths': self.file_paths,
- 'partial_grading': False
+ # kwargs = {'user_answer': user_answer,
+ # 'test_case_data': self.test_case_data,
+ # 'file_paths': self.file_paths,
+ # 'partial_grading': False,
+ # }
+ kwargs = {
+ 'metadata': {
+ 'user_answer': user_answer,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False,
+ 'language': 'python'
+ },
+ 'test_case_data': self.test_case_data,
}
# When
- evaluator = PythonAssertionEvaluator()
- result = evaluator.evaluate(**kwargs)
+ evaluator = CodeEvaluator(self.in_dir)
+ result = evaluator.evaluate(kwargs)
# Then
self.assertTrue(result.get('success'))
@@ -50,15 +61,20 @@ class PythonAssertionEvaluationTestCases(unittest.TestCase):
def test_incorrect_answer(self):
# Given
user_answer = "def add(a,b):\n\treturn a - b"
- kwargs = {'user_answer': user_answer,
- 'test_case_data': self.test_case_data,
- 'file_paths': self.file_paths,
- 'partial_grading': False
+ kwargs = {
+ 'metadata': {
+ 'user_answer': user_answer,
+ 'file_paths': self.file_paths,
+ 'partial_grading': False,
+ 'language': 'python'
+ },
+ 'test_case_data': self.test_case_data,
}
# When
- evaluator = PythonAssertionEvaluator()
- result = evaluator.evaluate(**kwargs)
+ # evaluator = PythonAssertionEvaluator()
+ evaluator = CodeEvaluator(self.in_dir)
+ result = evaluator.evaluate(kwargs)
# Then
self.assertFalse(result.get('success'))
@@ -72,111 +88,6 @@ class PythonAssertionEvaluationTestCases(unittest.TestCase):
result.get('error')
)
- def test_partial_incorrect_answer(self):
- # Given
- user_answer = "def add(a,b):\n\treturn abs(a) + abs(b)"
- test_case_data = [{"test_case": 'assert(add(-1,2)==1)', 'weight': 1.0},
- {"test_case": 'assert(add(-1,-2)==-3)', 'weight': 1.0},
- {"test_case": 'assert(add(1,2)==3)', 'weight': 2.0}
- ]
- kwargs = {'user_answer': user_answer,
- 'test_case_data': test_case_data,
- 'file_paths': self.file_paths,
- 'partial_grading': True
- }
-
- # When
- evaluator = PythonAssertionEvaluator()
- result = evaluator.evaluate(**kwargs)
-
- # Then
- self.assertFalse(result.get('success'))
- self.assertEqual(result.get('weight'), 2.0)
- self.assertIn('AssertionError in: assert(add(-1,2)==1)',
- result.get('error')
- )
- self.assertIn('AssertionError in: assert(add(-1,-2)==-3)',
- result.get('error')
- )
-
- def test_infinite_loop(self):
- # Given
- user_answer = "def add(a, b):\n\twhile True:\n\t\tpass"
- kwargs = {'user_answer': user_answer,
- 'test_case_data': self.test_case_data,
- 'file_paths': self.file_paths,
- 'partial_grading': False
- }
-
- # When
- evaluator = PythonAssertionEvaluator()
- result = evaluator.evaluate(**kwargs)
-
- # Then
- self.assertFalse(result.get('success'))
- self.assertEqual(result.get('error'), self.timeout_msg)
-
- def test_syntax_error(self):
- # Given
- user_answer = dedent("""
- def add(a, b);
- return a + b
- """)
- syntax_error_msg = ["Traceback",
- "call",
- "File",
- "line",
- "<string>",
- "SyntaxError",
- "invalid syntax"
- ]
- kwargs = {'user_answer': user_answer,
- 'test_case_data': self.test_case_data,
- 'file_paths': self.file_paths,
- 'partial_grading': False
- }
-
- # When
- evaluator = PythonAssertionEvaluator()
- result = evaluator.evaluate(**kwargs)
- err = result.get("error").splitlines()
-
- # Then
- self.assertFalse(result.get("success"))
- self.assertEqual(5, len(err))
- for msg in syntax_error_msg:
- self.assertIn(msg, result.get("error"))
-
- def test_indent_error(self):
- # Given
- user_answer = dedent("""
- def add(a, b):
- return a + b
- """)
- indent_error_msg = ["Traceback", "call",
- "File",
- "line",
- "<string>",
- "IndentationError",
- "indented block"
- ]
- kwargs = {'user_answer': user_answer,
- 'test_case_data': self.test_case_data,
- 'file_paths': self.file_paths,
- 'partial_grading': False
-
- }
-
- # When
- evaluator = PythonAssertionEvaluator()
- result = evaluator.evaluate(**kwargs)
- err = result.get("error").splitlines()
-
- # Then
- self.assertFalse(result.get("success"))
- self.assertEqual(5, len(err))
- for msg in indent_error_msg:
- self.assertIn(msg, result.get("error"))
def test_name_error(self):
# Given
@@ -542,5 +453,539 @@ class PythonStdIOEvaluationTestCases(unittest.TestCase):
self.assertEqual(result.get('error'), timeout_msg)
self.assertFalse(result.get('success'))
+# class PythonAssertionEvaluationTestCases(unittest.TestCase):
+# def setUp(self):
+# with open('/tmp/test.txt', 'wb') as f:
+# f.write('2'.encode('ascii'))
+# tmp_in_dir_path = tempfile.mkdtemp()
+# self.in_dir = tmp_in_dir_path
+# self.test_case_data = [{"test_case": 'assert(add(1,2)==3)', 'weight': 0.0},
+# {"test_case": 'assert(add(-1,2)==1)', 'weight': 0.0},
+# {"test_case": 'assert(add(-1,-2)==-3)', 'weight': 0.0},
+# ]
+# self.timeout_msg = ("Code took more than {0} seconds to run. "
+# "You probably have an infinite loop in"
+# " your code.").format(SERVER_TIMEOUT)
+# self.file_paths = None
+
+# def tearDown(self):
+# os.remove('/tmp/test.txt')
+# shutil.rmtree(self.in_dir)
+
+# def test_correct_answer(self):
+# # Given
+# user_answer = "def add(a,b):\n\treturn a + b"
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertTrue(result.get('success'))
+# self.assertIn("Correct answer", result.get('error'))
+
+# def test_incorrect_answer(self):
+# # Given
+# user_answer = "def add(a,b):\n\treturn a - b"
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertFalse(result.get('success'))
+# self.assertIn('AssertionError in: assert(add(1,2)==3)',
+# result.get('error')
+# )
+# self.assertIn('AssertionError in: assert(add(-1,2)==1)',
+# result.get('error')
+# )
+# self.assertIn('AssertionError in: assert(add(-1,-2)==-3)',
+# result.get('error')
+# )
+
+# def test_partial_incorrect_answer(self):
+# # Given
+# user_answer = "def add(a,b):\n\treturn abs(a) + abs(b)"
+# test_case_data = [{"test_case": 'assert(add(-1,2)==1)', 'weight': 1.0},
+# {"test_case": 'assert(add(-1,-2)==-3)', 'weight': 1.0},
+# {"test_case": 'assert(add(1,2)==3)', 'weight': 2.0}
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': True
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertFalse(result.get('success'))
+# self.assertEqual(result.get('weight'), 2.0)
+# self.assertIn('AssertionError in: assert(add(-1,2)==1)',
+# result.get('error')
+# )
+# self.assertIn('AssertionError in: assert(add(-1,-2)==-3)',
+# result.get('error')
+# )
+
+# def test_infinite_loop(self):
+# # Given
+# user_answer = "def add(a, b):\n\twhile True:\n\t\tpass"
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertFalse(result.get('success'))
+# self.assertEqual(result.get('error'), self.timeout_msg)
+
+# def test_syntax_error(self):
+# # Given
+# user_answer = dedent("""
+# def add(a, b);
+# return a + b
+# """)
+# syntax_error_msg = ["Traceback",
+# "call",
+# "File",
+# "line",
+# "<string>",
+# "SyntaxError",
+# "invalid syntax"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# self.assertEqual(5, len(err))
+# for msg in syntax_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+# def test_indent_error(self):
+# # Given
+# user_answer = dedent("""
+# def add(a, b):
+# return a + b
+# """)
+# indent_error_msg = ["Traceback", "call",
+# "File",
+# "line",
+# "<string>",
+# "IndentationError",
+# "indented block"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# self.assertEqual(5, len(err))
+# for msg in indent_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+# def test_name_error(self):
+# # Given
+# user_answer = ""
+# name_error_msg = ["Traceback",
+# "call",
+# "NameError",
+# "name",
+# "defined"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# self.assertEqual(3, len(err))
+# for msg in name_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+# def test_recursion_error(self):
+# # Given
+# user_answer = dedent("""
+# def add(a, b):
+# return add(3, 3)
+# """)
+# recursion_error_msg = ["Traceback",
+# "call",
+# "maximum recursion depth exceeded"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# for msg in recursion_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+# def test_type_error(self):
+# # Given
+# user_answer = dedent("""
+# def add(a):
+# return a + b
+# """)
+# type_error_msg = ["Traceback",
+# "call",
+# "TypeError",
+# "argument"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# self.assertEqual(3, len(err))
+# for msg in type_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+# def test_value_error(self):
+# # Given
+# user_answer = dedent("""
+# def add(a, b):
+# c = 'a'
+# return int(a) + int(b) + int(c)
+# """)
+# value_error_msg = ["Traceback",
+# "call",
+# "ValueError",
+# "invalid literal",
+# "base"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# self.assertEqual(4, len(err))
+# for msg in value_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+# def test_file_based_assert(self):
+# # Given
+# self.test_case_data = [{"test_case": "assert(ans()=='2')", "weight": 0.0}]
+# self.file_paths = [('/tmp/test.txt', False)]
+# user_answer = dedent("""
+# def ans():
+# with open("test.txt") as f:
+# return f.read()[0]
+# """)
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertIn("Correct answer", result.get('error'))
+# self.assertTrue(result.get('success'))
+
+# def test_single_testcase_error(self):
+# # Given
+# """ Tests the user answer with just an incorrect test case """
+
+# user_answer = "def palindrome(a):\n\treturn a == a[::-1]"
+# test_case_data = [{"test_case": 's="abbb"\nasert palindrome(s)==False',
+# "weight": 0.0
+# }
+# ]
+# syntax_error_msg = ["Traceback",
+# "call",
+# "File",
+# "line",
+# "<string>",
+# "SyntaxError",
+# "invalid syntax"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# self.assertEqual(5, len(err))
+# for msg in syntax_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+
+# def test_multiple_testcase_error(self):
+# """ Tests the user answer with an correct test case
+# first and then with an incorrect test case """
+# # Given
+# user_answer = "def palindrome(a):\n\treturn a == a[::-1]"
+# test_case_data = [{"test_case": 'assert(palindrome("abba")==True)',
+# "weight": 0.0
+# },
+# {"test_case": 's="abbb"\nassert palindrome(S)==False',
+# "weight": 0.0
+# }
+# ]
+# name_error_msg = ["Traceback",
+# "call",
+# "File",
+# "line",
+# "<string>",
+# "NameError",
+# "name 'S' is not defined"
+# ]
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonAssertionEvaluator()
+# result = evaluator.evaluate(**kwargs)
+# err = result.get("error").splitlines()
+
+# # Then
+# self.assertFalse(result.get("success"))
+# self.assertEqual(3, len(err))
+# for msg in name_error_msg:
+# self.assertIn(msg, result.get("error"))
+
+
+# class PythonStdIOEvaluationTestCases(unittest.TestCase):
+# def setUp(self):
+# with open('/tmp/test.txt', 'wb') as f:
+# f.write('2'.encode('ascii'))
+# self.file_paths = None
+
+# def test_correct_answer_integer(self):
+# # Given
+# self.test_case_data = [{"expected_input": "1\n2",
+# "expected_output": "3",
+# "weight": 0.0
+# }]
+# user_answer = dedent("""
+# a = int(input())
+# b = int(input())
+# print(a+b)
+# """
+# )
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonStdioEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertTrue(result.get('success'))
+# self.assertIn("Correct answer", result.get('error'))
+
+# def test_correct_answer_list(self):
+# # Given
+# self.test_case_data = [{"expected_input": "1,2,3\n5,6,7",
+# "expected_output": "[1, 2, 3, 5, 6, 7]",
+# "weight": 0.0
+# }]
+# user_answer = dedent("""
+# from six.moves import input
+# input_a = input()
+# input_b = input()
+# a = [int(i) for i in input_a.split(',')]
+# b = [int(i) for i in input_b.split(',')]
+# print(a+b)
+# """
+# )
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonStdioEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertTrue(result.get('success'))
+# self.assertIn("Correct answer", result.get('error'))
+
+# def test_correct_answer_string(self):
+# # Given
+# self.test_case_data = [{"expected_input": ("the quick brown fox jumps over the lazy dog\nthe"),
+# "expected_output": "2",
+# "weight": 0.0
+# }]
+# user_answer = dedent("""
+# from six.moves import input
+# a = str(input())
+# b = str(input())
+# print(a.count(b))
+# """
+# )
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonStdioEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertTrue(result.get('success'))
+# self.assertIn("Correct answer", result.get('error'))
+
+# def test_incorrect_answer_integer(self):
+# # Given
+# self.test_case_data = [{"expected_input": "1\n2",
+# "expected_output": "3",
+# "weight": 0.0
+# }]
+# user_answer = dedent("""
+# a = int(input())
+# b = int(input())
+# print(a-b)
+# """
+# )
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonStdioEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertFalse(result.get('success'))
+# self.assertIn("Incorrect answer", result.get('error'))
+
+# def test_file_based_answer(self):
+# # Given
+# self.test_case_data = [{"expected_input": "",
+# "expected_output": "2",
+# "weight": 0.0
+# }]
+# self.file_paths = [('/tmp/test.txt', False)]
+
+# user_answer = dedent("""
+# with open("test.txt") as f:
+# a = f.read()
+# print(a[0])
+# """
+# )
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': self.test_case_data,
+# 'file_paths': self.file_paths,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonStdioEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertEqual(result.get('error'), "Correct answer\n")
+# self.assertTrue(result.get('success'))
+
+# def test_infinite_loop(self):
+# # Given
+# test_case_data = [{"expected_input": "1\n2",
+# "expected_output": "3",
+# "weight": 0.0
+# }]
+# timeout_msg = ("Code took more than {0} seconds to run. "
+# "You probably have an infinite loop in"
+# " your code.").format(SERVER_TIMEOUT)
+# user_answer = "while True:\n\tpass"
+# kwargs = {'user_answer': user_answer,
+# 'test_case_data': test_case_data,
+# 'partial_grading': False
+# }
+
+# # When
+# evaluator = PythonStdioEvaluator()
+# result = evaluator.evaluate(**kwargs)
+
+# # Then
+# self.assertEqual(result.get('error'), timeout_msg)
+# self.assertFalse(result.get('success'))
+
if __name__ == '__main__':
unittest.main()
diff --git a/yaksh/language_registry.py b/yaksh/language_registry.py
index 0e0140b..8d3aad2 100644
--- a/yaksh/language_registry.py
+++ b/yaksh/language_registry.py
@@ -14,15 +14,16 @@ def get_registry():
registry = _LanguageRegistry()
return registry
-def unpack_json(json_data):
- data = json.loads(json_data)
- return data
+# def unpack_json_to_python_obj(json_data):
+# data = json.loads(json_data)
+# return data
-def create_evaluator_instance(language, test_case_type, json_data, in_dir):
+def create_evaluator_instance(metadata, test_case): #create_evaluator_instance
"""Create instance of relevant EvaluateCode class based on language"""
registry = get_registry()
- cls = registry.get_class(language, test_case_type)
- instance = cls(in_dir)
+ cls = registry.get_class(metadata.get('language'), test_case.get('test_case_type'))
+ # instance = cls(in_dir)
+ instance = cls(metadata, test_case)
return instance
class _LanguageRegistry(object):
@@ -36,8 +37,8 @@ class _LanguageRegistry(object):
""" Get the code evaluator class for the given language """
if not self._register.get(language):
self._register[language] = code_evaluators.get(language)
-
test_case_register = self._register[language]
+
cls = test_case_register.get(test_case_type)
module_name, class_name = cls.rsplit(".", 1)
# load the module, will raise ImportError if module cannot be loaded
diff --git a/yaksh/old_code_server.py b/yaksh/old_code_server.py
new file mode 100755
index 0000000..b3c9c30
--- /dev/null
+++ b/yaksh/old_code_server.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python
+
+"""This server runs an HTTP server (using tornado) and several code servers
+using XMLRPC that can be submitted code
+and tests and returns the output. It *should* be run as root and will run as
+the user 'nobody' so as to minimize any damange by errant code. This can be
+configured by editing settings.py to run as many servers as desired. One can
+also specify the ports on the command line. Here are examples::
+
+ $ sudo ./code_server.py
+ # Runs servers based on settings.py:SERVER_PORTS one server per port given.
+
+or::
+
+ $ sudo ./code_server.py 8001 8002 8003 8004 8005
+ # Runs 5 servers on ports specified.
+
+All these servers should be running as nobody. This will also start a server
+pool that defaults to port 50000 and is configurable in
+settings.py:SERVER_POOL_PORT. This port exposes a `get_server_port` function
+that returns an available server.
+
+"""
+
+# Standard library imports
+from __future__ import unicode_literals
+import json
+from multiprocessing import Process, Queue
+import os
+from os.path import isdir, dirname, abspath, join, isfile
+import pwd
+import re
+import signal
+import stat
+import subprocess
+import sys
+
+try:
+ from SimpleXMLRPCServer import SimpleXMLRPCServer
+except ImportError:
+ # The above import will not work on Python-3.x.
+ from xmlrpc.server import SimpleXMLRPCServer
+
+try:
+ from urllib import unquote
+except ImportError:
+ # The above import will not work on Python-3.x.
+ from urllib.parse import unquote
+
+# Library imports
+from tornado.ioloop import IOLoop
+from tornado.web import Application, RequestHandler
+
+# Local imports
+from .settings import SERVER_PORTS, SERVER_POOL_PORT
+from .language_registry import create_evaluator_instance, unpack_json
+
+
+MY_DIR = abspath(dirname(__file__))
+
+
+# Private Protocol ##########
+def run_as_nobody():
+ """Runs the current process as nobody."""
+ # Set the effective uid and to that of nobody.
+ nobody = pwd.getpwnam('nobody')
+ os.setegid(nobody.pw_gid)
+ os.seteuid(nobody.pw_uid)
+
+
+###############################################################################
+# `CodeServer` class.
+###############################################################################
+class CodeServer(object):
+ """A code server that executes user submitted test code, tests it and
+ reports if the code was correct or not.
+ """
+ def __init__(self, port, queue):
+ self.port = port
+ self.queue = queue
+
+ # Public Protocol ##########
+ def check_code(self, language, test_case_type, json_data, in_dir=None):
+ """Calls relevant EvaluateCode class based on language to check the
+ answer code
+ """
+ code_evaluator = create_evaluator_instance(language,
+ test_case_type,
+ json_data,
+ in_dir
+ )
+ data = unpack_json(json_data)
+ result = code_evaluator.evaluate(**data)
+
+ # Put us back into the server pool queue since we are free now.
+ self.queue.put(self.port)
+
+ return json.dumps(result)
+
+ def run(self):
+ """Run XMLRPC server, serving our methods."""
+ server = SimpleXMLRPCServer(("0.0.0.0", self.port))
+ self.server = server
+ server.register_instance(self)
+ self.queue.put(self.port)
+ server.serve_forever()
+
+
+###############################################################################
+# `ServerPool` class.
+###############################################################################
+class ServerPool(object):
+ """Manages a pool of CodeServer objects."""
+ def __init__(self, ports, pool_port=50000):
+ """Create a pool of servers. Uses a shared Queue to get available
+ servers.
+
+ Parameters
+ ----------
+
+ ports : list(int)
+ List of ports at which the CodeServer's should run.
+
+ pool_port : int
+ Port at which the server pool should serve.
+ """
+ self.my_port = pool_port
+ self.ports = ports
+ queue = Queue(maxsize=len(self.ports))
+ self.queue = queue
+ servers = []
+ processes = []
+ for port in self.ports:
+ server = CodeServer(port, queue)
+ servers.append(server)
+ p = Process(target=server.run)
+ processes.append(p)
+ self.servers = servers
+ self.processes = processes
+ self.app = self._make_app()
+
+ def _make_app(self):
+ app = Application([
+ (r"/.*", MainHandler, dict(server=self)),
+ ])
+ app.listen(self.my_port)
+ return app
+
+ def _start_code_servers(self):
+ for proc in self.processes:
+ if proc.pid is None:
+ proc.start()
+
+ # Public Protocol ##########
+
+ def get_server_port(self):
+ """Get available server port from ones in the pool. This will block
+ till it gets an available server.
+ """
+ return self.queue.get()
+
+ def get_status(self):
+ """Returns current queue size and total number of ports used."""
+ try:
+ qs = self.queue.qsize()
+ except NotImplementedError:
+ # May not work on OS X so we return a dummy.
+ qs = len(self.ports)
+
+ return qs, len(self.ports)
+
+ def run(self):
+ """Run server which returns an available server port where code
+ can be executed.
+ """
+ # We start the code servers here to ensure they are run as nobody.
+ self._start_code_servers()
+ IOLoop.current().start()
+
+ def stop(self):
+ """Stop all the code server processes.
+ """
+ for proc in self.processes:
+ proc.terminate()
+ IOLoop.current().stop()
+
+
+class MainHandler(RequestHandler):
+ def initialize(self, server):
+ self.server = server
+
+ def get(self):
+ path = self.request.path[1:]
+ if len(path) == 0:
+ port = self.server.get_server_port()
+ self.write(str(port))
+ elif path == "status":
+ q_size, total = self.server.get_status()
+ result = "%d servers out of %d are free.\n"%(q_size, total)
+ load = float(total - q_size)/total*100
+ result += "Load: %s%%\n"%load
+ self.write(result)
+
+
+###############################################################################
+def main(args=None):
+ if args:
+ ports = [int(x) for x in args]
+ else:
+ ports = SERVER_PORTS
+
+ server_pool = ServerPool(ports=ports, pool_port=SERVER_POOL_PORT)
+ # This is done *after* the server pool is created because when the tornado
+ # app calls listen(), it cannot be nobody.
+ run_as_nobody()
+
+ server_pool.run()
+
+if __name__ == '__main__':
+ args = sys.argv[1:]
+ main(args)
diff --git a/yaksh/python_assertion_evaluator.py b/yaksh/python_assertion_evaluator.py
index 986dbf2..5f1b29e 100644
--- a/yaksh/python_assertion_evaluator.py
+++ b/yaksh/python_assertion_evaluator.py
@@ -11,32 +11,60 @@ from .code_evaluator import CodeEvaluator, TimeoutException
from .file_utils import copy_files, delete_files
-class PythonAssertionEvaluator(CodeEvaluator):
+class PythonAssertionEvaluator(object):
"""Tests the Python code obtained from Code Server"""
- def setup(self):
- super(PythonAssertionEvaluator, self).setup()
+ def __init__(self, metadata, test_case_data):
self.exec_scope = None
self.files = []
- def teardown(self):
+ # Set metadata values
+ self.user_answer = metadata.get('user_answer')
+ self.file_paths = metadata.get('file_paths')
+ self.partial_grading = metadata.get('partial_grading')
+
+ # Set test case data values
+ self.test_case = test_case_data.get('test_case')
+ self.weight = test_case_data.get('weight')
+
+ def __del__(self):
# Delete the created file.
if self.files:
delete_files(self.files)
- super(PythonAssertionEvaluator, self).teardown()
- def compile_code(self, user_answer, file_paths, test_case, weight):
- if file_paths:
- self.files = copy_files(file_paths)
+
+ # def setup(self):
+ # super(PythonAssertionEvaluator, self).setup()
+ # self.exec_scope = None
+ # self.files = []
+
+
+ # def teardown(self):
+ # # Delete the created file.
+ # if self.files:
+ # delete_files(self.files)
+ # super(PythonAssertionEvaluator, self).teardown()
+
+ # def teardown(self):
+ # # Delete the created file.
+ # if self.files:
+ # delete_files(self.files)
+ # # Cancel the signal
+ # delete_signal_handler()
+ # self._change_dir(dirname(MY_DIR))
+
+ def compile_code(self):
+ if self.file_paths:
+ self.files = copy_files(self.file_paths)
if self.exec_scope:
return None
else:
- submitted = compile(user_answer, '<string>', mode='exec')
+ submitted = compile(self.user_answer, '<string>', mode='exec')
self.exec_scope = {}
exec(submitted, self.exec_scope)
return self.exec_scope
- def check_code(self, user_answer, file_paths, partial_grading, test_case, weight):
+ def check_code(self):
""" Function validates user answer by running an assertion based test case
against it
@@ -61,26 +89,21 @@ class PythonAssertionEvaluator(CodeEvaluator):
test_case_weight = 0.0
try:
tb = None
- _tests = compile(test_case, '<string>', mode='exec')
+ _tests = compile(self.test_case, '<string>', mode='exec')
exec(_tests, self.exec_scope)
except AssertionError:
type, value, tb = sys.exc_info()
info = traceback.extract_tb(tb)
fname, lineno, func, text = info[-1]
- text = str(test_case).splitlines()[lineno-1]
+ text = str(self.test_case).splitlines()[lineno-1]
err = ("-----\nExpected Test Case:\n{0}\n"
- "Error - {1} {2} in: {3}\n-----").format(test_case,
- type.__name__,
- str(value), text
- )
- except TimeoutException:
- raise
+ "Error - {1} {2} in: {3}\n-----").format(self.test_case, type.__name__, str(value), text)
except Exception:
msg = traceback.format_exc(limit=0)
err = "Error in Test case: {0}".format(msg)
else:
success = True
- err = '-----\nCorrect answer\nTest Case: {0}\n-----'.format(test_case)
- test_case_weight = float(weight) if partial_grading else 0.0
+ err = '-----\nCorrect answer\nTest Case: {0}\n-----'.format(self.test_case)
+ test_case_weight = float(self.weight) if self.partial_grading else 0.0
del tb
return success, err, test_case_weight
diff --git a/yaksh/settings.py b/yaksh/settings.py
index 6383999..690ddb1 100644
--- a/yaksh/settings.py
+++ b/yaksh/settings.py
@@ -19,22 +19,28 @@ SERVER_TIMEOUT = 4
# host.org/foo/exam set URL_ROOT='/foo'
URL_ROOT = ''
+# code_evaluators = {
+# "python": {"standardtestcase": "yaksh.python_assertion_evaluator.PythonAssertionEvaluator",
+# "stdiobasedtestcase": "yaksh.python_stdio_evaluator.PythonStdioEvaluator"
+# },
+# "c": {"standardtestcase": "yaksh.cpp_code_evaluator.CppCodeEvaluator",
+# "stdiobasedtestcase": "yaksh.cpp_stdio_evaluator.CppStdioEvaluator"
+# },
+# "cpp": {"standardtestcase": "yaksh.cpp_code_evaluator.CppCodeEvaluator",
+# "stdiobasedtestcase": "yaksh.cpp_stdio_evaluator.CppStdioEvaluator"
+# },
+# "java": {"standardtestcase": "yaksh.java_code_evaluator.JavaCodeEvaluator",
+# "stdiobasedtestcase": "yaksh.java_stdio_evaluator.JavaStdioEvaluator"},
+
+# "bash": {"standardtestcase": "yaksh.bash_code_evaluator.BashCodeEvaluator",
+# "stdiobasedtestcase": "yaksh.bash_stdio_evaluator.BashStdioEvaluator"
+# },
+
+# "scilab": {"standardtestcase": "yaksh.scilab_code_evaluator.ScilabCodeEvaluator"},
+# }
+
code_evaluators = {
"python": {"standardtestcase": "yaksh.python_assertion_evaluator.PythonAssertionEvaluator",
"stdiobasedtestcase": "yaksh.python_stdio_evaluator.PythonStdioEvaluator"
- },
- "c": {"standardtestcase": "yaksh.cpp_code_evaluator.CppCodeEvaluator",
- "stdiobasedtestcase": "yaksh.cpp_stdio_evaluator.CppStdioEvaluator"
- },
- "cpp": {"standardtestcase": "yaksh.cpp_code_evaluator.CppCodeEvaluator",
- "stdiobasedtestcase": "yaksh.cpp_stdio_evaluator.CppStdioEvaluator"
- },
- "java": {"standardtestcase": "yaksh.java_code_evaluator.JavaCodeEvaluator",
- "stdiobasedtestcase": "yaksh.java_stdio_evaluator.JavaStdioEvaluator"},
-
- "bash": {"standardtestcase": "yaksh.bash_code_evaluator.BashCodeEvaluator",
- "stdiobasedtestcase": "yaksh.bash_stdio_evaluator.BashStdioEvaluator"
- },
-
- "scilab": {"standardtestcase": "yaksh.scilab_code_evaluator.ScilabCodeEvaluator"},
+ }
}