From 28e2d32f9839b0e3cb3e99ce0113832627610bd7 Mon Sep 17 00:00:00 2001
From: ankitjavalkar
Date: Wed, 4 Feb 2015 20:03:13 +0530
Subject: Add test case model for testing redesign
Conflicts:
testapp/exam/models.py
testapp/exam/views.py
---
testapp/exam/code_server.py | 12 ++++++------
testapp/exam/models.py | 28 +++++++++++++++++++++++++++
testapp/exam/templates/exam/add_question.html | 3 +++
testapp/exam/views.py | 11 ++++++++++-
testapp/exam/xmlrpc_clients.py | 4 ++--
5 files changed, 49 insertions(+), 9 deletions(-)
(limited to 'testapp/exam')
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py
index 792197d..64d6a47 100755
--- a/testapp/exam/code_server.py
+++ b/testapp/exam/code_server.py
@@ -68,7 +68,7 @@ class CodeServer(object):
'have an infinite loop in your code.' % SERVER_TIMEOUT
self.timeout_msg = msg
- def run_python_code(self, answer, test_code, in_dir=None):
+ def run_python_code(self, answer, test_code, test_obj, in_dir=None): ####
"""Tests given Python function (`answer`) with the `test_code`
supplied. If the optional `in_dir` keyword argument is supplied
it changes the directory to that directory (it does not change
@@ -123,7 +123,7 @@ class CodeServer(object):
return success, err
- def run_bash_code(self, answer, test_code, in_dir=None):
+ def run_bash_code(self, answer, test_code, test_obj, in_dir=None): ####
"""Tests given Bash code (`answer`) with the `test_code` supplied.
The testcode should typically contain two lines, the first is a path to
@@ -290,7 +290,7 @@ class CodeServer(object):
stdnt_stdout+stdnt_stderr)
return False, err
- def run_c_code(self, answer, test_code, in_dir=None):
+ def run_c_code(self, answer, test_code, test_obj, in_dir=None): ####
"""Tests given C code (`answer`) with the `test_code` supplied.
The testcode is a path to the reference code.
@@ -442,7 +442,7 @@ class CodeServer(object):
err = err + "\n" + stdnt_stderr
return success, err
- def run_cplus_code(self, answer, test_code, in_dir=None):
+ def run_cplus_code(self, answer, test_code, test_obj, in_dir=None): ####
"""Tests given C++ code (`answer`) with the `test_code` supplied.
The testcode is a path to the reference code.
@@ -504,7 +504,7 @@ class CodeServer(object):
return success, err
- def run_java_code(self, answer, test_code, in_dir=None):
+ def run_java_code(self, answer, test_code, test_obj, in_dir=None): ####
"""Tests given java code (`answer`) with the `test_code` supplied.
The testcode is a path to the reference code.
@@ -659,7 +659,7 @@ class CodeServer(object):
stripped = stripped + c
return ''.join(stripped)
- def run_scilab_code(self, answer, test_code, in_dir=None):
+ def run_scilab_code(self, answer, test_code, test_obj, in_dir=None): ####
"""Tests given Scilab function (`answer`) with the `test_code`
supplied. If the optional `in_dir` keyword argument is supplied
it changes the directory to that directory (it does not change
diff --git a/testapp/exam/models.py b/testapp/exam/models.py
index 72fb51b..df3b485 100644
--- a/testapp/exam/models.py
+++ b/testapp/exam/models.py
@@ -62,6 +62,15 @@ class Question(models.Model):
# Test cases for the question in the form of code that is run.
test = models.TextField(blank=True)
+ #Test case Keyword arguments in dict form
+ test_keyword_args = models.TextField(blank=True) ####
+
+ #Test case Positional arguments in list form
+ test_pos_args = models.TextField(blank=True) ####
+
+ #Test case Expected answer in list form
+ test_expected_answer = models.TextField(blank=True) ####
+
# Any multiple choice options. Place one option per line.
options = models.TextField(blank=True)
@@ -396,3 +405,22 @@ class AssignmentUpload(models.Model):
user = models.ForeignKey(Profile)
assignmentQuestion = models.ForeignKey(Question)
assignmentFile = models.FileField(upload_to=get_assignment_dir)
+
+
+################################################################################
+class TestCase(models.Model):
+ question = models.ForeignKey(Question)
+
+ # Test case Keyword arguments in dict form
+ test_keyword_args = models.TextField(blank=True)
+
+ # Test case Positional arguments in list form
+ test_pos_args = models.TextField(blank=True)
+
+ # Test case Expected answer in list form
+ test_expected_answer = models.TextField(blank=True)
+
+ # Is this Test Case public or not. If it is not
+ # public it will not be visible to the user
+ # public = models.BooleanField(default=False)
+
diff --git a/testapp/exam/templates/exam/add_question.html b/testapp/exam/templates/exam/add_question.html
index b0b22b1..b6ce908 100644
--- a/testapp/exam/templates/exam/add_question.html
+++ b/testapp/exam/templates/exam/add_question.html
@@ -28,6 +28,9 @@
+
+
{% for i in data %}
{% endfor %}
diff --git a/testapp/exam/views.py b/testapp/exam/views.py
index be0d292..3d22d55 100644
--- a/testapp/exam/views.py
+++ b/testapp/exam/views.py
@@ -18,8 +18,8 @@ from itertools import chain
from testapp.exam.models import Quiz, Question, QuestionPaper, QuestionSet
from testapp.exam.models import Profile, Answer, AnswerPaper, User, TestCase
from testapp.exam.forms import UserRegisterForm, UserLoginForm, QuizForm,\
- QuestionForm, RandomQuestionForm
-from testapp.exam.xmlrpc_clients import code_server
+ QuestionForm, RandomQuestionForm, TestCaseFormSet
+from exam.xmlrpc_clients import code_server
from settings import URL_ROOT
from testapp.exam.models import AssignmentUpload
@@ -281,7 +281,7 @@ def edit_quiz(request):
def edit_question(request):
- """Edit the list of questions seleted by the user for editing."""
+ """Edit the list of questions selected by the user for editing."""
user = request.user
if not user.is_authenticated() or not is_moderator(user):
raise Http404('You are not allowed to view this page!')
@@ -290,7 +290,6 @@ def edit_question(request):
summary = request.POST.getlist('summary')
description = request.POST.getlist('description')
points = request.POST.getlist('points')
- test = request.POST.getlist('test')
options = request.POST.getlist('options')
type = request.POST.getlist('type')
active = request.POST.getlist('active')
@@ -298,10 +297,15 @@ def edit_question(request):
snippet = request.POST.getlist('snippet')
for j, question_id in enumerate(question_list):
question = Question.objects.get(id=question_id)
+ test_case_formset = TestCaseFormSet(request.POST, prefix='test', instance=question)
+ if test_case_formset.is_valid():
+ test_case_instance = test_case_formset.save(commit=False)
+ for i in test_case_instance:
+ i.save()
+
question.summary = summary[j]
question.description = description[j]
question.points = points[j]
- question.test = test[j]
question.options = options[j]
question.active = active[j]
question.language = language[j]
@@ -314,6 +318,16 @@ def edit_question(request):
def add_question(request, question_id=None):
"""To add a new question in the database.
Create a new question and store it."""
+
+ def add_or_delete_test_form(post_request, instance):
+ request_copy = post_request.copy()
+ if 'add_test' in post_request:
+ request_copy['test-TOTAL_FORMS'] = int(request_copy['test-TOTAL_FORMS']) + 1
+ elif 'delete_test' in post_request:
+ request_copy['test-TOTAL_FORMS'] = int(request_copy['test-TOTAL_FORMS']) - 1
+ test_case_formset = TestCaseFormSet(request_copy, prefix='test', instance=instance)
+ return test_case_formset
+
user = request.user
ci = RequestContext(request)
if not user.is_authenticated() or not is_moderator(user):
@@ -321,44 +335,84 @@ def add_question(request, question_id=None):
if request.method == "POST":
form = QuestionForm(request.POST)
if form.is_valid():
- data = form.cleaned_data
if question_id is None:
- form.save()
- question = Question.objects.order_by("-id")[0]
- tags = form['tags'].data.split(',')
- for i in range(0, len(tags)-1):
- tag = tags[i].strip()
- question.tags.add(tag)
- return my_redirect("/exam/manage/questions")
+ test_case_formset = add_or_delete_test_form(request.POST, form.save(commit=False))
+ if 'save_question' in request.POST:
+ qtn = form.save(commit=False)
+ test_case_formset = TestCaseFormSet(request.POST, prefix='test', instance=qtn)
+ form.save()
+ question = Question.objects.order_by("-id")[0]
+ tags = form['tags'].data.split(',')
+ for i in range(0, len(tags)-1):
+ tag = tags[i].strip()
+ question.tags.add(tag)
+ if test_case_formset.is_valid():
+ test_case_formset.save()
+ else:
+ return my_render_to_response('exam/add_question.html',
+ {'form': form,
+ 'formset': test_case_formset},
+ context_instance=ci)
+
+ return my_redirect("/exam/manage/questions")
+
+ return my_render_to_response('exam/add_question.html',
+ {'form': form,
+ 'formset': test_case_formset},
+ context_instance=ci)
+
else:
d = Question.objects.get(id=question_id)
- d.summary = form['summary'].data
- d.description = form['description'].data
- d.points = form['points'].data
- d.test = form['test'].data
- d.options = form['options'].data
- d.type = form['type'].data
- d.active = form['active'].data
- d.language = form['language'].data
- d.snippet = form['snippet'].data
- d.save()
- question = Question.objects.get(id=question_id)
- for tag in question.tags.all():
- question.tags.remove(tag)
- tags = form['tags'].data.split(',')
- for i in range(0, len(tags)-1):
- tag = tags[i].strip()
- question.tags.add(tag)
- return my_redirect("/exam/manage/questions")
+ test_case_formset = add_or_delete_test_form(request.POST, d)
+ if 'save_question' in request.POST:
+ d.summary = form['summary'].data
+ d.description = form['description'].data
+ d.points = form['points'].data
+ d.options = form['options'].data
+ d.type = form['type'].data
+ d.active = form['active'].data
+ d.language = form['language'].data
+ d.snippet = form['snippet'].data
+ d.save()
+ question = Question.objects.get(id=question_id)
+ for tag in question.tags.all():
+ question.tags.remove(tag)
+ tags = form['tags'].data.split(',')
+ for i in range(0, len(tags)-1):
+ tag = tags[i].strip()
+ question.tags.add(tag)
+
+ test_case_formset = TestCaseFormSet(request.POST, prefix='test', instance=question)
+ if test_case_formset.is_valid():
+ test_case_instance = test_case_formset.save(commit=False)
+ for i in test_case_instance:
+ i.save()
+ else:
+ return my_render_to_response('exam/add_question.html',
+ {'form': form,
+ 'formset': test_case_formset},
+ context_instance=ci)
+
+
+ return my_redirect("/exam/manage/questions")
+ return my_render_to_response('exam/add_question.html',
+ {'form': form,
+ 'formset': test_case_formset},
+ context_instance=ci)
+
else:
return my_render_to_response('exam/add_question.html',
{'form': form},
context_instance=ci)
else:
+ form = QuestionForm()
+ test_case_formset = TestCaseFormSet(prefix='test', instance=Question())
if question_id is None:
form = QuestionForm()
+ test_case_formset = TestCaseFormSet(prefix='test', instance=Question())
return my_render_to_response('exam/add_question.html',
- {'form': form},
+ {'form': form,
+ 'formset': test_case_formset},
context_instance=ci)
else:
d = Question.objects.get(id=question_id)
@@ -366,7 +420,6 @@ def add_question(request, question_id=None):
form.initial['summary'] = d.summary
form.initial['description'] = d.description
form.initial['points'] = d.points
- form.initial['test'] = d.test
form.initial['options'] = d.options
form.initial['type'] = d.type
form.initial['active'] = d.active
@@ -380,8 +433,13 @@ def add_question(request, question_id=None):
if (initial_tags == ","):
initial_tags = ""
form.initial['tags'] = initial_tags
+
+ test_case_formset = TestCaseFormSet(prefix='test',
+ instance=d)
+
return my_render_to_response('exam/add_question.html',
- {'form': form},
+ {'form': form,
+ 'formset': test_case_formset},
context_instance=ci)
@@ -1172,13 +1230,13 @@ def show_all_questions(request):
data = request.POST.getlist('question')
forms = []
+ formsets = []
for j in data:
d = Question.objects.get(id=j)
form = QuestionForm()
form.initial['summary'] = d.summary
form.initial['description'] = d.description
form.initial['points'] = d.points
- form.initial['test'] = d.test
form.initial['options'] = d.options
form.initial['type'] = d.type
form.initial['active'] = d.active
@@ -1193,8 +1251,13 @@ def show_all_questions(request):
initial_tags = ""
form.initial['tags'] = initial_tags
forms.append(form)
+ test_case_formset = TestCaseFormSet(prefix='test', instance=d)
+ formsets.append(test_case_formset)
+ data_list = zip(forms, formsets)
+
return my_render_to_response('exam/edit_question.html',
- {'forms': forms, 'data': data},
+ {'data': data,
+ 'data_list': data_list},
context_instance=ci)
else:
questions = Question.objects.all()
--
cgit
From 9440bff5ae69c1d27f5c9622ca15cb8c603c6174 Mon Sep 17 00:00:00 2001
From: ankitjavalkar
Date: Thu, 5 Mar 2015 12:27:02 +0530
Subject: Code Server code cleanup and code commonification - Pass question and
test case info as json string (info_parameter) - Return success status and
error message as a json string - Embed user answer and question lang in
info_parameter - Commonify Python code evaluations and assertion test -
Deprecate individual function call based on language
---
testapp/exam/code_server.py | 1521 ++++++++++++++-----------
testapp/exam/models.py | 14 +-
testapp/exam/templates/exam/add_question.html | 14 -
testapp/exam/views.py | 21 +-
testapp/exam/xmlrpc_clients.py | 22 +-
5 files changed, 904 insertions(+), 688 deletions(-)
(limited to 'testapp/exam')
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py
index c34971f..fa5d916 100755
--- a/testapp/exam/code_server.py
+++ b/testapp/exam/code_server.py
@@ -29,6 +29,7 @@ import signal
from multiprocessing import Process, Queue
import subprocess
import re
+import json
# Local imports.
from settings import SERVER_PORTS, SERVER_TIMEOUT, SERVER_POOL_PORT
@@ -53,6 +54,131 @@ def timeout_handler(signum, frame):
"""A handler for the ALARM signal."""
raise TimeoutException('Code took too long to run.')
+def create_delete_signal_handler(action=None, old_handler=None):
+ if action == "new": # Add a new signal handler for the execution of this code.
+ prev_handler = signal.signal(signal.SIGALRM, timeout_handler)
+ signal.alarm(SERVER_TIMEOUT)
+ return prev_handler
+ elif action == "original": # Set back any original signal handler.
+ if old_handler is not None:
+ signal.signal(signal.SIGALRM, old_handler)
+ return
+ else:
+ raise Exception("Signal Handler: object cannot be NoneType")
+ elif action == "delete": # Cancel the signal if any, see signal.alarm documentation.
+ signal.alarm(0)
+ return
+ else:
+ raise Exception("Signal Handler: action not specified")
+
+
+###############################################################################
+# `TestCode` class.
+###############################################################################
+class TestCode(object):
+ """Evaluates and tests the code obtained from Code Server"""
+ def __init__(self, info_parameter, in_dir=None):
+ info_parameter = json.loads(info_parameter)
+ self.test_parameter = info_parameter.get("test_parameter")
+ self.language = info_parameter.get("language")
+ self.user_answer = info_parameter.get("user_answer")
+ self.in_dir = in_dir
+
+ def run_code(self):
+ self._change_dir(self.in_dir)
+
+ # Create test cases
+ test_code = self.create_test_case()
+ # Evaluate, run code and obtain result
+ result = self.evaluate_code(test_code)
+
+ return result
+
+ def evaluate_code(self, test_code):
+ success = False
+ prev_handler = None
+
+ if self.language == "python":
+ tb = None
+ prev_handler = create_delete_signal_handler("new")
+
+ try:
+ submitted = compile(self.user_answer, '', mode='exec')
+ g = {}
+ exec submitted in g
+ _tests = compile(test_code, '', mode='exec')
+ exec _tests in g
+ except TimeoutException:
+ err = self.timeout_msg
+ except AssertionError:
+ type, value, tb = sys.exc_info()
+ info = traceback.extract_tb(tb)
+ fname, lineno, func, text = info[-1]
+ text = str(test_code).splitlines()[lineno-1]
+ err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ else:
+ success = True
+ err = 'Correct answer'
+ finally:
+ del tb
+ # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ # Cancel the signal
+ create_delete_signal_handler("delete")
+
+ if self.language == "c++":
+ pass
+
+ if self.language == "c":
+ pass
+
+ if self.language == "java":
+ pass
+
+ if self.language == "scilab":
+ pass
+
+ result = {'success': success, 'error': err}
+ return result
+
+ def compile_code(self):
+ pass
+
+ def create_test_case(self):
+ # Create assert based test cases in python
+ if self.language == "python":
+ test_code = ""
+ for test_case in self.test_parameter:
+ pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
+ else ""
+ kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
+ if test_case.get('kw_args') else ""
+ args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
+ tcode = "assert {0}({1}) == {2}" \
+ .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
+ test_code += tcode + "\n"
+ return test_code
+
+ if self.language == "c++":
+ pass
+
+ if self.language == "c":
+ pass
+
+ if self.language == "java":
+ pass
+
+ if self.language == "scilab":
+ pass
+
+ def _change_dir(self, in_dir):
+ if in_dir is not None and isdir(in_dir):
+ os.chdir(in_dir)
+
###############################################################################
# `CodeServer` class.
@@ -68,6 +194,16 @@ class CodeServer(object):
'have an infinite loop in your code.' % SERVER_TIMEOUT
self.timeout_msg = msg
+ def checker(self, info_parameter, in_dir=None):
+ """Calls the TestCode Class to test the current code"""
+ tc = TestCode(info_parameter, in_dir)
+ result = tc.run_code()
+
+ # Put us back into the server pool queue since we are free now.
+ self.queue.put(self.port)
+
+ return json.dumps(result)
+
def run_python_code(self, answer, test_parameter, in_dir=None):
"""Tests given Python function (`answer`) with the `test_code`
supplied. If the optional `in_dir` keyword argument is supplied
@@ -88,6 +224,7 @@ class CodeServer(object):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(SERVER_TIMEOUT)
+ test_parameter = json.loads(test_parameter)
success = False
tb = None
@@ -135,659 +272,741 @@ class CodeServer(object):
result = {'success': success, 'error': err}
return result
- def run_bash_code(self, answer, test_parameter, in_dir=None):
- """Tests given Bash code (`answer`) with the `test_code` supplied.
-
- The testcode should typically contain two lines, the first is a path to
- the reference script we are to compare against. The second is a path
- to the arguments to be supplied to the reference and submitted script.
- The output of these will be compared for correctness.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- def _set_exec(fname):
- os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
- | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
- | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
- submit_f = open('submit.sh', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
- _set_exec(submit_path)
-
- # ref path and path to arguments is a comma seperated string obtained
- #from ref_code_path field in TestCase Mode
- path_list = test_parameter.get('ref_code_path').split(',')
- ref_path = path_list[0].strip() if path_list[0] else ""
- test_case_path = path_list[1].strip() if path_list[1] else ""
-
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
- if not test_case_path.startswith('/'):
- test_case_path = join(MY_DIR, test_case_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self.check_bash_script(ref_path, submit_path,
- test_case_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _run_command(self, cmd_args, *args, **kw):
- """Run a command in a subprocess while blocking, the process is killed
- if it takes more than 2 seconds to run. Return the Popen object, the
- stdout and stderr.
- """
- try:
- proc = subprocess.Popen(cmd_args, *args, **kw)
- stdout, stderr = proc.communicate()
- except TimeoutException:
- # Runaway code, so kill it.
- proc.kill()
- # Re-raise exception.
- raise
- return proc, stdout, stderr
-
- def check_bash_script(self, ref_script_path, submit_script_path,
- test_case_path=None):
- """ Function validates student script using instructor script as
- reference. Test cases can optionally be provided. The first argument
- ref_script_path, is the path to instructor script, it is assumed to
- have executable permission. The second argument submit_script_path, is
- the path to the student script, it is assumed to have executable
- permission. The Third optional argument is the path to test the
- scripts. Each line in this file is a test case and each test case is
- passed to the script as standard arguments.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student script passes all
- test cases/have same output, when compared to the instructor script
-
- returns (False, error_msg): If the student script fails a single
- test/have dissimilar output, when compared to the instructor script.
-
- Returns (False, error_msg): If mandatory arguments are not files or if
- the required permissions are not given to the file(s).
-
- """
- if not ref_script_path:
- return False, "No Test Script path found"
- if not isfile(ref_script_path):
- return False, "No file at %s" % ref_script_path
- if not isfile(submit_script_path):
- return False, 'No file at %s' % submit_script_path
- if not os.access(ref_script_path, os.X_OK):
- return False, 'Script %s is not executable' % ref_script_path
- if not os.access(submit_script_path, os.X_OK):
- return False, 'Script %s is not executable' % submit_script_path
-
- if test_case_path is None or "":
- ret = self._run_command(ref_script_path, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, inst_stdout, inst_stderr = ret
- ret = self._run_command(submit_script_path, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdnt_stdout, stdnt_stderr = ret
- if inst_stdout == stdnt_stdout:
- return True, 'Correct answer'
- else:
- err = "Error: expected %s, got %s" % (inst_stderr,
- stdnt_stderr)
- return False, err
- else:
- if not isfile(test_case_path):
- return False, "No test case at %s" % test_case_path
- if not os.access(ref_script_path, os.R_OK):
- return False, "Test script %s, not readable" % test_case_path
- valid_answer = True # We initially make it one, so that we can
- # stop once a test case fails
- loop_count = 0 # Loop count has to be greater than or
- # equal to one.
- # Useful for caching things like empty
- # test files,etc.
- test_cases = open(test_case_path).readlines()
- num_lines = len(test_cases)
- for test_case in test_cases:
- loop_count += 1
- if valid_answer:
- args = [ref_script_path] + [x for x in test_case.split()]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, inst_stdout, inst_stderr = ret
- args = [submit_script_path]+[x for x in test_case.split()]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdnt_stdout, stdnt_stderr = ret
- valid_answer = inst_stdout == stdnt_stdout
- if valid_answer and (num_lines == loop_count):
- return True, "Correct answer"
- else:
- err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
- stdnt_stdout+stdnt_stderr)
- return False, err
-
- def run_c_code(self, answer, test_parameter, in_dir=None):
- """Tests given C code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # File extension must be .c
- submit_f = open('submit.c', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_c_cpp_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _compile_command(self, cmd, *args, **kw):
- """Compiles C/C++/java code and returns errors if any.
- Run a command in a subprocess while blocking, the process is killed
- if it takes more than 2 seconds to run. Return the Popen object, the
- stderr.
- """
- try:
- proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = proc_compile.communicate()
- except TimeoutException:
- # Runaway code, so kill it.
- proc_compile.kill()
- # Re-raise exception.
- raise
- return proc_compile, err
-
- def _check_c_cpp_code(self, ref_code_path, submit_code_path):
- """ Function validates student code using instructor code as
- reference.The first argument ref_code_path, is the path to
- instructor code, it is assumed to have executable permission.
- The second argument submit_code_path, is the path to the student
- code, it is assumed to have executable permission.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student function returns
- expected output when called by reference code.
-
- returns (False, error_msg): If the student function fails to return
- expected output when called by reference code.
-
- Returns (False, error_msg): If mandatory arguments are not files or
- if the required permissions are not given to the file(s).
-
- """
- if not isfile(ref_code_path):
- return False, "No file at %s" % ref_code_path
- if not isfile(submit_code_path):
- return False, 'No file at %s' % submit_code_path
-
- success = False
- output_path = os.getcwd() + '/output'
- compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
- ret = self._compile_command(compile_command)
- proc, stdnt_stderr = ret
-
- # Only if compilation is successful, the program is executed
- # And tested with testcases
- if stdnt_stderr == '':
- executable = os.getcwd() + '/executable'
- compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
- executable)
- ret = self._compile_command(compile_main)
- proc, main_err = ret
- if main_err == '':
- args = [executable]
- ret = self._run_command(args, stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
- if proc.returncode == 0:
- success, err = True, "Correct answer"
- else:
- err = stdout + "\n" + stderr
- os.remove(executable)
- else:
- err = "Error:"
- try:
- error_lines = main_err.splitlines()
- for e in error_lines:
- err = err + "\n" + e.split(":", 1)[1]
- except:
- err = err + "\n" + main_err
- os.remove(output_path)
- else:
- err = "Compilation Error:"
- try:
- error_lines = stdnt_stderr.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + stdnt_stderr
- return success, err
-
- def run_cplus_code(self, answer, test_parameter, in_dir=None):
- """Tests given C++ code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # The file extension must be .cpp
- submit_f = open('submitstd.cpp', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_c_cpp_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def run_java_code(self, answer, test_parameter, in_dir=None):
- """Tests given java code (`answer`) with the `test_code` supplied.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- If the optional `in_dir` keyword argument is supplied it changes the
- directory to that directory (it does not change it back to the original
- when done).
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # The file extension must be .java
- # The class name and file name must be same in java
- submit_f = open('Test.java', 'w')
- submit_f.write(answer.lstrip())
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- success, err = self._check_java_code(ref_path, submit_path)
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- return success, err
-
- def _check_java_code(self, ref_code_path, submit_code_path):
- """ Function validates student code using instructor code as
- reference.The first argument ref_code_path, is the path to
- instructor code, it is assumed to have executable permission.
- The second argument submit_code_path, is the path to the student
- code, it is assumed to have executable permission.
-
- Returns
- --------
-
- returns (True, "Correct answer") : If the student function returns
- expected output when called by reference code.
-
- returns (False, error_msg): If the student function fails to return
- expected output when called by reference code.
-
- Returns (False, error_msg): If mandatory arguments are not files or
- if the required permissions are not given to the file(s).
-
- """
- if not isfile(ref_code_path):
- return False, "No file at %s" % ref_code_path
- if not isfile(submit_code_path):
- return False, 'No file at %s' % submit_code_path
-
- success = False
- compile_command = "javac %s" % (submit_code_path)
- ret = self._compile_command(compile_command)
- proc, stdnt_stderr = ret
- stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
-
- # Only if compilation is successful, the program is executed
- # And tested with testcases
- if stdnt_stderr == '':
- student_directory = os.getcwd() + '/'
- student_file_name = "Test"
- compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
- student_directory,
- student_directory)
- ret = self._compile_command(compile_main)
- proc, main_err = ret
- main_err = self._remove_null_substitute_char(main_err)
-
- if main_err == '':
- main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
- run_command = "java -cp %s %s" % (student_directory,
- main_file_name)
- ret = self._run_command(run_command,
- stdin=None,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
- if proc.returncode == 0:
- success, err = True, "Correct answer"
- else:
- err = stdout + "\n" + stderr
- success = False
- os.remove("%s%s.class" % (student_directory, main_file_name))
- else:
- err = "Error:\n"
- try:
- error_lines = main_err.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + main_err
- os.remove("%s%s.class" % (student_directory, student_file_name))
- else:
- err = "Compilation Error:\n"
- try:
- error_lines = stdnt_stderr.splitlines()
- for e in error_lines:
- if ':' in e:
- err = err + "\n" + e.split(":", 1)[1]
- else:
- err = err + "\n" + e
- except:
- err = err + "\n" + stdnt_stderr
- result = {'success': success, 'error': err}
- return result
-
- def _remove_null_substitute_char(self, string):
- """Returns a string without any null and substitute characters"""
- stripped = ""
- for c in string:
- if ord(c) is not 26 and ord(c) is not 0:
- stripped = stripped + c
- return ''.join(stripped)
+# ###############################################################################
+# # `CodeServer` class.
+# ###############################################################################
+# class CodeServer(object):
+# """A code server that executes user submitted test code, tests it and
+# reports if the code was correct or not.
+# """
+# def __init__(self, port, queue):
+# self.port = port
+# self.queue = queue
+# msg = 'Code took more than %s seconds to run. You probably '\
+# 'have an infinite loop in your code.' % SERVER_TIMEOUT
+# self.timeout_msg = msg
+
+# def run_python_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Python function (`answer`) with the `test_code`
+# supplied. If the optional `in_dir` keyword argument is supplied
+# it changes the directory to that directory (it does not change
+# it back to the original when done). This function also timesout
+# when the function takes more than SERVER_TIMEOUT seconds to run
+# to prevent runaway code.
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# test_parameter = json.loads(test_parameter)
+# success = False
+# tb = None
+
+# test_code = ""
+# for test_case in test_parameter:
+# pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
+# else ""
+# kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
+# if test_case.get('kw_args') else ""
+# args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
+# tcode = "assert {0}({1}) == {2}" \
+# .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
+# test_code += tcode + "\n"
+# try:
+# submitted = compile(answer, '', mode='exec')
+# g = {}
+# exec submitted in g
+# _tests = compile(test_code, '', mode='exec')
+# exec _tests in g
+# except TimeoutException:
+# err = self.timeout_msg
+# except AssertionError:
+# type, value, tb = sys.exc_info()
+# info = traceback.extract_tb(tb)
+# fname, lineno, func, text = info[-1]
+# text = str(test_code).splitlines()[lineno-1]
+# err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# else:
+# success = True
+# err = 'Correct answer'
+# finally:
+# del tb
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def run_bash_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Bash code (`answer`) with the `test_code` supplied.
+
+# The testcode should typically contain two lines, the first is a path to
+# the reference script we are to compare against. The second is a path
+# to the arguments to be supplied to the reference and submitted script.
+# The output of these will be compared for correctness.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# def _set_exec(fname):
+# os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
+# | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
+# | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
+# submit_f = open('submit.sh', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+# _set_exec(submit_path)
+
+# # ref path and path to arguments is a comma seperated string obtained
+# #from ref_code_path field in TestCase Mode
+# path_list = test_parameter.get('ref_code_path').split(',')
+# ref_path = path_list[0].strip() if path_list[0] else ""
+# test_case_path = path_list[1].strip() if path_list[1] else ""
+
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+# if not test_case_path.startswith('/'):
+# test_case_path = join(MY_DIR, test_case_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self.check_bash_script(ref_path, submit_path,
+# test_case_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _run_command(self, cmd_args, *args, **kw):
+# """Run a command in a subprocess while blocking, the process is killed
+# if it takes more than 2 seconds to run. Return the Popen object, the
+# stdout and stderr.
+# """
+# try:
+# proc = subprocess.Popen(cmd_args, *args, **kw)
+# stdout, stderr = proc.communicate()
+# except TimeoutException:
+# # Runaway code, so kill it.
+# proc.kill()
+# # Re-raise exception.
+# raise
+# return proc, stdout, stderr
+
+# def check_bash_script(self, ref_script_path, submit_script_path,
+# test_case_path=None):
+# """ Function validates student script using instructor script as
+# reference. Test cases can optionally be provided. The first argument
+# ref_script_path, is the path to instructor script, it is assumed to
+# have executable permission. The second argument submit_script_path, is
+# the path to the student script, it is assumed to have executable
+# permission. The Third optional argument is the path to test the
+# scripts. Each line in this file is a test case and each test case is
+# passed to the script as standard arguments.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student script passes all
+# test cases/have same output, when compared to the instructor script
+
+# returns (False, error_msg): If the student script fails a single
+# test/have dissimilar output, when compared to the instructor script.
+
+# Returns (False, error_msg): If mandatory arguments are not files or if
+# the required permissions are not given to the file(s).
+
+# """
+# if not ref_script_path:
+# return False, "No Test Script path found"
+# if not isfile(ref_script_path):
+# return False, "No file at %s" % ref_script_path
+# if not isfile(submit_script_path):
+# return False, 'No file at %s' % submit_script_path
+# if not os.access(ref_script_path, os.X_OK):
+# return False, 'Script %s is not executable' % ref_script_path
+# if not os.access(submit_script_path, os.X_OK):
+# return False, 'Script %s is not executable' % submit_script_path
+
+# if test_case_path is None or "":
+# ret = self._run_command(ref_script_path, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, inst_stdout, inst_stderr = ret
+# ret = self._run_command(submit_script_path, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdnt_stdout, stdnt_stderr = ret
+# if inst_stdout == stdnt_stdout:
+# return True, 'Correct answer'
+# else:
+# err = "Error: expected %s, got %s" % (inst_stderr,
+# stdnt_stderr)
+# return False, err
+# else:
+# if not isfile(test_case_path):
+# return False, "No test case at %s" % test_case_path
+# if not os.access(ref_script_path, os.R_OK):
+# return False, "Test script %s, not readable" % test_case_path
+# valid_answer = True # We initially make it one, so that we can
+# # stop once a test case fails
+# loop_count = 0 # Loop count has to be greater than or
+# # equal to one.
+# # Useful for caching things like empty
+# # test files,etc.
+# test_cases = open(test_case_path).readlines()
+# num_lines = len(test_cases)
+# for test_case in test_cases:
+# loop_count += 1
+# if valid_answer:
+# args = [ref_script_path] + [x for x in test_case.split()]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, inst_stdout, inst_stderr = ret
+# args = [submit_script_path]+[x for x in test_case.split()]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdnt_stdout, stdnt_stderr = ret
+# valid_answer = inst_stdout == stdnt_stdout
+# if valid_answer and (num_lines == loop_count):
+# return True, "Correct answer"
+# else:
+# err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
+# stdnt_stdout+stdnt_stderr)
+# return False, err
+
+# def run_c_code(self, answer, test_parameter, in_dir=None):
+# """Tests given C code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # File extension must be .c
+# submit_f = open('submit.c', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_c_cpp_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _compile_command(self, cmd, *args, **kw):
+# """Compiles C/C++/java code and returns errors if any.
+# Run a command in a subprocess while blocking, the process is killed
+# if it takes more than 2 seconds to run. Return the Popen object, the
+# stderr.
+# """
+# try:
+# proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# out, err = proc_compile.communicate()
+# except TimeoutException:
+# # Runaway code, so kill it.
+# proc_compile.kill()
+# # Re-raise exception.
+# raise
+# return proc_compile, err
+
+# def _check_c_cpp_code(self, ref_code_path, submit_code_path):
+# """ Function validates student code using instructor code as
+# reference.The first argument ref_code_path, is the path to
+# instructor code, it is assumed to have executable permission.
+# The second argument submit_code_path, is the path to the student
+# code, it is assumed to have executable permission.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student function returns
+# expected output when called by reference code.
+
+# returns (False, error_msg): If the student function fails to return
+# expected output when called by reference code.
+
+# Returns (False, error_msg): If mandatory arguments are not files or
+# if the required permissions are not given to the file(s).
+
+# """
+# if not isfile(ref_code_path):
+# return False, "No file at %s" % ref_code_path
+# if not isfile(submit_code_path):
+# return False, 'No file at %s' % submit_code_path
+
+# success = False
+# output_path = os.getcwd() + '/output'
+# compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
+# ret = self._compile_command(compile_command)
+# proc, stdnt_stderr = ret
+
+# # Only if compilation is successful, the program is executed
+# # And tested with testcases
+# if stdnt_stderr == '':
+# executable = os.getcwd() + '/executable'
+# compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
+# executable)
+# ret = self._compile_command(compile_main)
+# proc, main_err = ret
+# if main_err == '':
+# args = [executable]
+# ret = self._run_command(args, stdin=None,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+# if proc.returncode == 0:
+# success, err = True, "Correct answer"
+# else:
+# err = stdout + "\n" + stderr
+# os.remove(executable)
+# else:
+# err = "Error:"
+# try:
+# error_lines = main_err.splitlines()
+# for e in error_lines:
+# err = err + "\n" + e.split(":", 1)[1]
+# except:
+# err = err + "\n" + main_err
+# os.remove(output_path)
+# else:
+# err = "Compilation Error:"
+# try:
+# error_lines = stdnt_stderr.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + stdnt_stderr
+# return success, err
+
+# def run_cplus_code(self, answer, test_parameter, in_dir=None):
+# """Tests given C++ code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # The file extension must be .cpp
+# submit_f = open('submitstd.cpp', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_c_cpp_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def run_java_code(self, answer, test_parameter, in_dir=None):
+# """Tests given java code (`answer`) with the `test_code` supplied.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# If the optional `in_dir` keyword argument is supplied it changes the
+# directory to that directory (it does not change it back to the original
+# when done).
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
+
+# # The file extension must be .java
+# # The class name and file name must be same in java
+# submit_f = open('Test.java', 'w')
+# submit_f.write(answer.lstrip())
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# success, err = self._check_java_code(ref_path, submit_path)
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# return success, err
+
+# def _check_java_code(self, ref_code_path, submit_code_path):
+# """ Function validates student code using instructor code as
+# reference.The first argument ref_code_path, is the path to
+# instructor code, it is assumed to have executable permission.
+# The second argument submit_code_path, is the path to the student
+# code, it is assumed to have executable permission.
+
+# Returns
+# --------
+
+# returns (True, "Correct answer") : If the student function returns
+# expected output when called by reference code.
+
+# returns (False, error_msg): If the student function fails to return
+# expected output when called by reference code.
+
+# Returns (False, error_msg): If mandatory arguments are not files or
+# if the required permissions are not given to the file(s).
+
+# """
+# if not isfile(ref_code_path):
+# return False, "No file at %s" % ref_code_path
+# if not isfile(submit_code_path):
+# return False, 'No file at %s' % submit_code_path
+
+# success = False
+# compile_command = "javac %s" % (submit_code_path)
+# ret = self._compile_command(compile_command)
+# proc, stdnt_stderr = ret
+# stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
+
+# # Only if compilation is successful, the program is executed
+# # And tested with testcases
+# if stdnt_stderr == '':
+# student_directory = os.getcwd() + '/'
+# student_file_name = "Test"
+# compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
+# student_directory,
+# student_directory)
+# ret = self._compile_command(compile_main)
+# proc, main_err = ret
+# main_err = self._remove_null_substitute_char(main_err)
+
+# if main_err == '':
+# main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
+# run_command = "java -cp %s %s" % (student_directory,
+# main_file_name)
+# ret = self._run_command(run_command,
+# stdin=None,
+# shell=True,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+# if proc.returncode == 0:
+# success, err = True, "Correct answer"
+# else:
+# err = stdout + "\n" + stderr
+# success = False
+# os.remove("%s%s.class" % (student_directory, main_file_name))
+# else:
+# err = "Error:\n"
+# try:
+# error_lines = main_err.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + main_err
+# os.remove("%s%s.class" % (student_directory, student_file_name))
+# else:
+# err = "Compilation Error:\n"
+# try:
+# error_lines = stdnt_stderr.splitlines()
+# for e in error_lines:
+# if ':' in e:
+# err = err + "\n" + e.split(":", 1)[1]
+# else:
+# err = err + "\n" + e
+# except:
+# err = err + "\n" + stdnt_stderr
+# result = {'success': success, 'error': err}
+# return result
+
+# def _remove_null_substitute_char(self, string):
+# """Returns a string without any null and substitute characters"""
+# stripped = ""
+# for c in string:
+# if ord(c) is not 26 and ord(c) is not 0:
+# stripped = stripped + c
+# return ''.join(stripped)
- def run_scilab_code(self, answer, test_parameter, in_dir=None):
- """Tests given Scilab function (`answer`) with the `test_code`
- supplied. If the optional `in_dir` keyword argument is supplied
- it changes the directory to that directory (it does not change
- it back to the original when done). This function also timesout
- when the function takes more than SERVER_TIMEOUT seconds to run
- to prevent runaway code.
-
- The testcode is a path to the reference code.
- The reference code will call the function submitted by the student.
- The reference code will check for the expected output.
-
- If the path's start with a "/" then we assume they are absolute paths.
- If not, we assume they are relative paths w.r.t. the location of this
- code_server script.
-
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
+# def run_scilab_code(self, answer, test_parameter, in_dir=None):
+# """Tests given Scilab function (`answer`) with the `test_code`
+# supplied. If the optional `in_dir` keyword argument is supplied
+# it changes the directory to that directory (it does not change
+# it back to the original when done). This function also timesout
+# when the function takes more than SERVER_TIMEOUT seconds to run
+# to prevent runaway code.
+
+# The testcode is a path to the reference code.
+# The reference code will call the function submitted by the student.
+# The reference code will check for the expected output.
+
+# If the path's start with a "/" then we assume they are absolute paths.
+# If not, we assume they are relative paths w.r.t. the location of this
+# code_server script.
+
+# Returns
+# -------
+
+# A tuple: (success, error message).
+
+# """
+# if in_dir is not None and isdir(in_dir):
+# os.chdir(in_dir)
- # Removes all the commands that terminates scilab
- answer,i = self._remove_scilab_exit(answer.lstrip())
-
- # Throw message if there are commmands that terminates scilab
- add_err=""
- if i > 0:
- add_err = "Please do not use exit, quit and abort commands in your\
- code.\n Otherwise your code will not be evaluated\
- correctly.\n"
-
- # The file extension should be .sci
- submit_f = open('function.sci','w')
- submit_f.write(answer)
- submit_f.close()
- submit_path = abspath(submit_f.name)
-
- ref_path = test_parameter.get('ref_code_path').strip()
- if not ref_path.startswith('/'):
- ref_path = join(MY_DIR, ref_path)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- # Do whatever testing needed.
- success = False
- try:
- cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
- cmd += ' | timeout 8 scilab-cli -nb'
- ret = self._run_command(cmd,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc, stdout, stderr = ret
-
- # Get only the error.
- stderr = self._get_error(stdout)
- if stderr is None:
- # Clean output
- stdout = self._strip_output(stdout)
- if proc.returncode == 5:
- success, err = True, "Correct answer"
- else:
- err = add_err + stdout
- else:
- err = add_err + stderr
- except TimeoutException:
- err = self.timeout_msg
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- finally:
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Delete the created file.
- os.remove(submit_path)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
- def _remove_scilab_exit(self, string):
- """
- Removes exit, quit and abort from the scilab code
- """
- new_string = ""
- i=0
- for line in string.splitlines():
- new_line = re.sub(r"exit.*$","",line)
- new_line = re.sub(r"quit.*$","",new_line)
- new_line = re.sub(r"abort.*$","",new_line)
- if line != new_line:
- i=i+1
- new_string = new_string +'\n'+ new_line
- return new_string, i
+# # Removes all the commands that terminates scilab
+# answer,i = self._remove_scilab_exit(answer.lstrip())
+
+# # Throw message if there are commmands that terminates scilab
+# add_err=""
+# if i > 0:
+# add_err = "Please do not use exit, quit and abort commands in your\
+# code.\n Otherwise your code will not be evaluated\
+# correctly.\n"
+
+# # The file extension should be .sci
+# submit_f = open('function.sci','w')
+# submit_f.write(answer)
+# submit_f.close()
+# submit_path = abspath(submit_f.name)
+
+# ref_path = test_parameter.get('ref_code_path').strip()
+# if not ref_path.startswith('/'):
+# ref_path = join(MY_DIR, ref_path)
+
+# # Add a new signal handler for the execution of this code.
+# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
+# signal.alarm(SERVER_TIMEOUT)
+
+# # Do whatever testing needed.
+# success = False
+# try:
+# cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
+# cmd += ' | timeout 8 scilab-cli -nb'
+# ret = self._run_command(cmd,
+# shell=True,
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE)
+# proc, stdout, stderr = ret
+
+# # Get only the error.
+# stderr = self._get_error(stdout)
+# if stderr is None:
+# # Clean output
+# stdout = self._strip_output(stdout)
+# if proc.returncode == 5:
+# success, err = True, "Correct answer"
+# else:
+# err = add_err + stdout
+# else:
+# err = add_err + stderr
+# except TimeoutException:
+# err = self.timeout_msg
+# except:
+# type, value = sys.exc_info()[:2]
+# err = "Error: {0}".format(repr(value))
+# finally:
+# # Set back any original signal handler.
+# signal.signal(signal.SIGALRM, old_handler)
+
+# # Delete the created file.
+# os.remove(submit_path)
+
+# # Cancel the signal if any, see signal.alarm documentation.
+# signal.alarm(0)
+
+# # Put us back into the server pool queue since we are free now.
+# self.queue.put(self.port)
+
+# result = {'success': success, 'error': err}
+# return result
+
+# def _remove_scilab_exit(self, string):
+# """
+# Removes exit, quit and abort from the scilab code
+# """
+# new_string = ""
+# i=0
+# for line in string.splitlines():
+# new_line = re.sub(r"exit.*$","",line)
+# new_line = re.sub(r"quit.*$","",new_line)
+# new_line = re.sub(r"abort.*$","",new_line)
+# if line != new_line:
+# i=i+1
+# new_string = new_string +'\n'+ new_line
+# return new_string, i
def _get_error(self, string):
"""
diff --git a/testapp/exam/models.py b/testapp/exam/models.py
index ea79ad2..dd0c806 100644
--- a/testapp/exam/models.py
+++ b/testapp/exam/models.py
@@ -1,4 +1,5 @@
import datetime
+import json
from random import sample, shuffle
from django.db import models
from django.contrib.auth.models import User
@@ -82,8 +83,9 @@ class Question(models.Model):
# Tags for the Question.
tags = TaggableManager()
- def consolidate_test_cases(self, test):
- test_case_parameter= []
+ def consolidate_answer_data(self, test, user_answer):
+ test_case_parameter = []
+ info_parameter = {}
for i in test:
kw_args_dict = {}
@@ -94,7 +96,6 @@ class Question(models.Model):
parameter_dict['func_name'] = i.func_name
parameter_dict['expected_answer'] = i.expected_answer
parameter_dict['ref_code_path'] = i.ref_code_path
- parameter_dict['language'] = self.language
if i.kw_args:
for args in i.kw_args.split(","):
@@ -109,7 +110,12 @@ class Question(models.Model):
parameter_dict['pos_args'] = pos_args_list
test_case_parameter.append(parameter_dict)
- return test_case_parameter
+ info_parameter['language'] = self.language
+ info_parameter['id'] = self.id
+ info_parameter['user_answer'] = user_answer
+ info_parameter['test_parameter'] = test_case_parameter
+
+ return json.dumps(info_parameter)
def __unicode__(self):
return self.summary
diff --git a/testapp/exam/templates/exam/add_question.html b/testapp/exam/templates/exam/add_question.html
index d989b81..e5e2574 100644
--- a/testapp/exam/templates/exam/add_question.html
+++ b/testapp/exam/templates/exam/add_question.html
@@ -40,20 +40,6 @@
{% endfor %}
{% endif %}
-
-
diff --git a/testapp/exam/views.py b/testapp/exam/views.py
index 3d22d55..8aef5ab 100644
--- a/testapp/exam/views.py
+++ b/testapp/exam/views.py
@@ -14,6 +14,7 @@ from django.db.models import Sum
from django.views.decorators.csrf import csrf_exempt
from taggit.models import Tag
from itertools import chain
+import json
# Local imports.
from testapp.exam.models import Quiz, Question, QuestionPaper, QuestionSet
from testapp.exam.models import Profile, Answer, AnswerPaper, User, TestCase
@@ -909,7 +910,6 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
q_paper = QuestionPaper.objects.get(id=questionpaper_id)
paper = AnswerPaper.objects.get(user=request.user, question_paper=q_paper)
test = TestCase.objects.filter(question=question)
- test_parameter = question.consolidate_test_cases(test)
snippet_code = request.POST.get('snippet')
user_code = request.POST.get('answer')
@@ -940,7 +940,7 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
user_answer = 'ASSIGNMENT UPLOADED'
else:
user_code = request.POST.get('answer')
- user_answer = user_code #snippet_code + "\n" + user_code
+ user_answer = snippet_code + "\n" + user_code if snippet_code else user_code
new_answer = Answer(question=question, answer=user_answer,
correct=False)
@@ -951,14 +951,16 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
# questions, we obtain the results via XML-RPC with the code executed
# safely in a separate process (the code_server.py) running as nobody.
if not question.type == 'upload':
- correct, result = validate_answer(user, user_answer, question, test_parameter)
+ if question.type == 'code':
+ info_parameter = question.consolidate_answer_data(test, user_answer)
+ correct, result = validate_answer(user, user_answer, question, info_parameter)
if correct:
new_answer.correct = correct
new_answer.marks = question.points
- new_answer.error = err_msg
+ new_answer.error = result.get('error')
success_msg = True
else:
- new_answer.error = err_msg
+ new_answer.error = result.get('error')
new_answer.save()
time_left = paper.time_left()
@@ -996,7 +998,7 @@ def check(request, q_id, attempt_num=None, questionpaper_id=None):
questionpaper_id, success_msg)
-def validate_answer(user, user_answer, question, test_parameter):
+def validate_answer(user, user_answer, question, info_parameter=None):
"""
Checks whether the answer submitted by the user is right or wrong.
If right then returns correct = True, success and
@@ -1011,17 +1013,18 @@ def validate_answer(user, user_answer, question, test_parameter):
if user_answer is not None:
if question.type == 'mcq':
- if user_answer.strip() == question.test.strip():
+ if user_answer.strip() == question.test.strip(): ####add question.answer/question.solution instead of test
correct = True
message = 'Correct answer'
elif question.type == 'mcc':
- answers = set(question.test.splitlines())
+ answers = set(question.test.splitlines()) ####add question.answer/question.solution instead of test
if set(user_answer) == answers:
correct = True
message = 'Correct answer'
elif question.type == 'code':
user_dir = get_user_dir(user)
- result = code_server.run_code(user_answer, test_parameter, user_dir, question.language)
+ json_result = code_server.run_code(info_parameter, user_dir)
+ result = json.loads(json_result)
if result.get('success'):
correct = True
diff --git a/testapp/exam/xmlrpc_clients.py b/testapp/exam/xmlrpc_clients.py
index 692fbb5..bbd6110 100644
--- a/testapp/exam/xmlrpc_clients.py
+++ b/testapp/exam/xmlrpc_clients.py
@@ -29,7 +29,7 @@ class CodeServerProxy(object):
"scilab": "run_scilab_code",
}
- def run_code(self, answer, test_parameter, user_dir, language):
+ def run_code(self, info_parameter, user_dir):
"""Tests given code (`answer`) with the `test_code` supplied. If the
optional `in_dir` keyword argument is supplied it changes the directory
to that directory (it does not change it back to the original when
@@ -38,26 +38,28 @@ class CodeServerProxy(object):
Parameters
----------
- answer : str
- The user's answer for the question.
+ info_parameter contains;
+ user_answer : str
+ The user's answer for the question.
test_code : str
The test code to check the user code with.
- user_dir : str (directory)
- The directory to run the tests inside.
language : str
The programming language to use.
+ user_dir : str (directory)
+ The directory to run the tests inside.
+
+
Returns
-------
- A tuple: (success, error message).
+ A json string of a dict: {success: success, err: error message}.
"""
- method_name = self.methods[language]
+ # method_name = self.methods[language]
try:
server = self._get_server()
- method = getattr(server, method_name)
- result = method(answer, test_parameter, user_dir) ####
+ result = server.checker(info_parameter, user_dir)
except ConnectionError:
- result = {'success': False, 'error': 'Unable to connect to any code servers!'}
+ result = json.dumps({'success': False, 'error': 'Unable to connect to any code servers!'})
return result
def _get_server(self):
--
cgit
From a88e59fa419d7ab31aa430e7424ff0a25169713f Mon Sep 17 00:00:00 2001
From: ankitjavalkar
Date: Wed, 18 Mar 2015 11:26:38 +0530
Subject: Code server code cleanup and code commonification - Commonify C,
C++, Java and Scilab code evaluation
---
testapp/exam/code_server.py | 1246 ++++++++++++++-----------------------------
1 file changed, 389 insertions(+), 857 deletions(-)
(limited to 'testapp/exam')
diff --git a/testapp/exam/code_server.py b/testapp/exam/code_server.py
index fa5d916..2259ce8 100755
--- a/testapp/exam/code_server.py
+++ b/testapp/exam/code_server.py
@@ -85,21 +85,34 @@ class TestCode(object):
self.in_dir = in_dir
def run_code(self):
- self._change_dir(self.in_dir)
+ """Tests given code (`answer`) with the `test_code` supplied.
- # Create test cases
- test_code = self.create_test_case()
- # Evaluate, run code and obtain result
- result = self.evaluate_code(test_code)
+ The ref_code_path is a path to the reference code.
+ The reference code will call the function submitted by the student.
+ The reference code will check for the expected output.
- return result
+ If the path's start with a "/" then we assume they are absolute paths.
+ If not, we assume they are relative paths w.r.t. the location of this
+ code_server script.
+
+ If the optional `in_dir` keyword argument is supplied it changes the
+ directory to that directory (it does not change it back to the original
+ when done).
+
+ Returns
+ -------
- def evaluate_code(self, test_code):
+ A tuple: (success, error message).
+ """
success = False
prev_handler = None
+ self._change_dir(self.in_dir)
+
if self.language == "python":
tb = None
+ test_code = self.create_test_case()
+ # Add a new signal handler for the execution of this code.
prev_handler = create_delete_signal_handler("new")
try:
@@ -130,26 +143,383 @@ class TestCode(object):
# Cancel the signal
create_delete_signal_handler("delete")
- if self.language == "c++":
- pass
+ else:
+ user_answer_file = {'C': 'submit.c', 'java': 'Test.java', 'scilab': 'function.sci',
+ 'C++': 'submitstd.cpp', 'bash': 'submit.sh'}
+
+ # File extension depending on the question language
+ submit_f = open(user_answer_file.get(self.language), 'w')
+ submit_f.write(self.user_answer.lstrip())
+ submit_f.close()
+ submit_path = abspath(submit_f.name)
+ if self.language == "bash":
+ self._set_exec(submit_path)
+
+ path_list = self.ref_code_path.split(',')
+ ref_path = path_list[0].strip() if path_list[0] else ""
+ test_case_path = path_list[1].strip() if path_list[1] else ""
+ if ref_path and not ref_path.startswith('/'):
+ ref_path = join(MY_DIR, ref_path)
+ if test_case_path and not test_case_path.startswith('/'):
+ test_case_path = join(MY_DIR, test_case_path)
+
+ if self.language in ["C++", "C", "java"]:
+ # Add a new signal handler for the execution of this code.
+ prev_handler = create_delete_signal_handler("new")
+
+ # Do whatever testing needed.
+ try:
+ success, err = self._check_code(ref_path, submit_path)
+ except TimeoutException:
+ err = self.timeout_msg
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ finally:
+ # # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ elif self.language == "scilab":
+ # Add a new signal handler for the execution of this code.
+ prev_handler = create_delete_signal_handler("new")
+
+ # Do whatever testing needed.
+ try:
+ cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
+ cmd += ' | timeout 8 scilab-cli -nb'
+ ret = self._run_command(cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdout, stderr = ret
+
+ # Get only the error.
+ stderr = self._get_error(stdout)
+ if stderr is None:
+ # Clean output
+ stdout = self._strip_output(stdout)
+ if proc.returncode == 5:
+ success, err = True, "Correct answer"
+ else:
+ err = add_err + stdout
+ else:
+ err = add_err + stderr
+ except TimeoutException:
+ err = self.timeout_msg
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ finally:
+ # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ elif self.language == "bash":
+ # Add a new signal handler for the execution of this code.
+ prev_handler = create_delete_signal_handler("new")
+
+ try:
+ success, err = self.check_bash_script(ref_path, submit_path,
+ test_case_path)
+ except TimeoutException:
+ err = self.timeout_msg
+ except:
+ type, value = sys.exc_info()[:2]
+ err = "Error: {0}".format(repr(value))
+ finally:
+ # Set back any original signal handler.
+ create_delete_signal_handler("original", prev_handler)
+
+ # Delete the created file.
+ os.remove(submit_path)
- if self.language == "c":
- pass
+ # Cancel the signal
+ create_delete_signal_handler("delete")
- if self.language == "java":
- pass
+ result = {'success': success, 'error': err}
+ return result
- if self.language == "scilab":
- pass
+ def _check_code(self, ref_code_path, submit_code_path):
+ """ Function validates student code using instructor code as
+ reference.The first argument ref_code_path, is the path to
+ instructor code, it is assumed to have executable permission.
+ The second argument submit_code_path, is the path to the student
+ code, it is assumed to have executable permission.
+
+ Returns
+ --------
+
+ returns (True, "Correct answer") : If the student function returns
+ expected output when called by reference code.
+
+ returns (False, error_msg): If the student function fails to return
+ expected output when called by reference code.
+
+ Returns (False, error_msg): If mandatory arguments are not files or
+ if the required permissions are not given to the file(s).
+
+ """
+
+ language_dependent_path = {
+ 'c_user_output_path': os.getcwd() + '/output',
+ 'c_ref_output_path': os.getcwd() + '/executable',
+ 'java_student_directory': os.getcwd() + '/',
+ # 'java_student_file_name': 'Test',
+ 'java_ref_file_name': (ref_code_path.split('/')[-1]).split('.')[0],
+ }
+
+ language_dependent_var = {
+ 'C': {'compile_command': 'g++ {0} -c -o {1}'.format(submit_code_path,
+ language_dependent_path.get('c_user_output_path')),
+ 'compile_main': 'g++ {0} {1} -o {2}'.format(ref_code_path,
+ language_dependent_path.get('c_user_output_path'),
+ language_dependent_path.get('c_ref_output_path')),
+ 'run_command_args': [language_dependent_path.get('c_ref_output_path')],
+ 'remove_user_output': language_dependent_path.get('c_user_output_path'),
+ 'remove_ref_output': language_dependent_path.get('c_ref_output_path')
+ },
+ 'java':{'compile_command': 'javac {0}'.format(submit_code_path),
+ 'compile_main': 'javac {0} -classpath {1} -d {2}'.format(ref_code_path,
+ language_dependent_path.get('java_student_directory'),
+ language_dependent_path.get('java_student_directory')),
+ 'run_command_args': "java -cp {0} {1}".format(
+ language_dependent_path.get('java_student_directory'),
+ language_dependent_path.get('java_ref_file_name')),
+ 'remove_user_output': "%s%s.class".format(
+ language_dependent_path.get('java_student_directory'),
+ 'Test'),
+ 'remove_ref_output': "%s%s.class".format(
+ language_dependent_path.get('java_student_directory'),
+ language_dependent_path.get('java_ref_file_name')),
+ }
+ }
+
+ if not isfile(ref_code_path):
+ return False, "No file at %s or Incorrect path" % ref_code_path
+ if not isfile(submit_code_path):
+ return False, 'No file at %s or Incorrect path' % submit_code_path
+
+ success = False
+ # output_path = os.getcwd() + '/output'
+ compile_command = language_dependent_var.get(self.language).get('compile_command') # "g++ %s -c -o %s" % (submit_code_path, output_path)
+ ret = self._compile_command(compile_command)
+ proc, stdnt_stderr = ret
+ if self.language == "java":
+ stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
+
+ # Only if compilation is successful, the program is executed
+ # And tested with testcases
+ if stdnt_stderr == '':
+ compile_main = language_dependent_var.get(self.language).get('compile_main') # "g++ %s %s -o %s" % (ref_code_path, output_path, executable)
+ ret = self._compile_command(compile_main)
+ proc, main_err = ret
+ if self.language == "java":
+ main_err = self._remove_null_substitute_char(main_err)
+
+ if main_err == '':
+ run_command_args = language_dependent_var.get(self.language).get('run_command_args')
+ ret = self._run_command(run_command_args, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdout, stderr = ret
+ if proc.returncode == 0:
+ success, err = True, "Correct answer"
+ else:
+ err = stdout + "\n" + stderr
+ os.remove(language_dependent_var.get(self.language).get('remove_ref_output')) # os.remove(executable)
+ else:
+ err = "Error:"
+ try:
+ error_lines = main_err.splitlines()
+ for e in error_lines:
+ if ':' in e:
+ err = err + "\n" + e.split(":", 1)[1]
+ else:
+ err = err + "\n" + e
+ except:
+ err = err + "\n" + main_err
+ os.remove(language_dependent_var.get(self.language).get('remove_user_output')) # os.remove(output_path)
+ else:
+ err = "Compilation Error:"
+ try:
+ error_lines = stdnt_stderr.splitlines()
+ for e in error_lines:
+ if ':' in e:
+ err = err + "\n" + e.split(":", 1)[1]
+ else:
+ err = err + "\n" + e
+ except:
+ err = err + "\n" + stdnt_stderr
result = {'success': success, 'error': err}
return result
- def compile_code(self):
- pass
+ def check_bash_script(self, ref_path, submit_path,
+ test_case_path=None):
+ """ Function validates student script using instructor script as
+ reference. Test cases can optionally be provided. The first argument
+ ref_path, is the path to instructor script, it is assumed to
+ have executable permission. The second argument submit_path, is
+ the path to the student script, it is assumed to have executable
+ permission. The Third optional argument is the path to test the
+ scripts. Each line in this file is a test case and each test case is
+ passed to the script as standard arguments.
+
+ Returns
+ --------
+
+ returns (True, "Correct answer") : If the student script passes all
+ test cases/have same output, when compared to the instructor script
+
+ returns (False, error_msg): If the student script fails a single
+ test/have dissimilar output, when compared to the instructor script.
+
+ Returns (False, error_msg): If mandatory arguments are not files or if
+ the required permissions are not given to the file(s).
+
+ """
+ if not isfile(ref_path):
+ return False, "No file at %s or Incorrect path" % ref_path
+ if not isfile(submit_path):
+ return False, "No file at %s or Incorrect path" % submit_path
+ if not os.access(ref_path, os.X_OK):
+ return False, "Script %s is not executable" % ref_path
+ if not os.access(submit_path, os.X_OK):
+ return False, "Script %s is not executable" % submit_path
+
+ if test_case_path is None or "":
+ ret = self._run_command(ref_path, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, inst_stdout, inst_stderr = ret
+ ret = self._run_command(submit_path, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdnt_stdout, stdnt_stderr = ret
+ if inst_stdout == stdnt_stdout:
+ return True, "Correct answer"
+ else:
+ err = "Error: expected %s, got %s" % (inst_stderr,
+ stdnt_stderr)
+ return False, err
+ else:
+ if not isfile(test_case_path):
+ return False, "No test case at %s" % test_case_path
+ if not os.access(ref_path, os.R_OK):
+ return False, "Test script %s, not readable" % test_case_path
+ valid_answer = True # We initially make it one, so that we can
+ # stop once a test case fails
+ loop_count = 0 # Loop count has to be greater than or
+ # equal to one.
+ # Useful for caching things like empty
+ # test files,etc.
+ test_cases = open(test_case_path).readlines()
+ num_lines = len(test_cases)
+ for test_case in test_cases:
+ loop_count += 1
+ if valid_answer:
+ args = [ref_path] + [x for x in test_case.split()]
+ ret = self._run_command(args, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, inst_stdout, inst_stderr = ret
+ args = [submit_path]+[x for x in test_case.split()]
+ ret = self._run_command(args, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc, stdnt_stdout, stdnt_stderr = ret
+ valid_answer = inst_stdout == stdnt_stdout
+ if valid_answer and (num_lines == loop_count):
+ return True, "Correct answer"
+ else:
+ err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
+ stdnt_stdout+stdnt_stderr)
+ return False, err
+
+ def _run_command(self, cmd_args, *args, **kw):
+ """Run a command in a subprocess while blocking, the process is killed
+ if it takes more than 2 seconds to run. Return the Popen object, the
+ stdout and stderr.
+ """
+ try:
+ proc = subprocess.Popen(cmd_args, *args, **kw)
+ stdout, stderr = proc.communicate()
+ except TimeoutException:
+ # Runaway code, so kill it.
+ proc.kill()
+ # Re-raise exception.
+ raise
+ return proc, stdout, stderr
+
+ def _compile_command(self, cmd, *args, **kw):
+ """Compiles C/C++/java code and returns errors if any.
+ Run a command in a subprocess while blocking, the process is killed
+ if it takes more than 2 seconds to run. Return the Popen object, the
+ stderr.
+ """
+ try:
+ proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = proc_compile.communicate()
+ except TimeoutException:
+ # Runaway code, so kill it.
+ proc_compile.kill()
+ # Re-raise exception.
+ raise
+ return proc_compile, err
+
+ def _remove_null_substitute_char(self, string):
+ """Returns a string without any null and substitute characters"""
+ stripped = ""
+ for c in string:
+ if ord(c) is not 26 and ord(c) is not 0:
+ stripped = stripped + c
+ return ''.join(stripped)
+
+ def _remove_scilab_exit(self, string):
+ """
+ Removes exit, quit and abort from the scilab code
+ """
+ new_string = ""
+ i=0
+ for line in string.splitlines():
+ new_line = re.sub(r"exit.*$","",line)
+ new_line = re.sub(r"quit.*$","",new_line)
+ new_line = re.sub(r"abort.*$","",new_line)
+ if line != new_line:
+ i=i+1
+ new_string = new_string +'\n'+ new_line
+ return new_string, i
+
+ def _get_error(self, string):
+ """
+ Fetches only the error from the string.
+ Returns None if no error.
+ """
+ obj = re.search("!.+\n.+",string);
+ if obj:
+ return obj.group()
+ return None
+
+ def _strip_output(self, out):
+ """
+ Cleans whitespace from the output
+ """
+ strip_out = "Message"
+ for l in out.split('\n'):
+ if l.strip():
+ strip_out = strip_out+"\n"+l.strip()
+ return strip_out
+
+ def _set_exec(self, fname):
+ os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
+ | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
+ | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
def create_test_case(self):
- # Create assert based test cases in python
+ """
+ Create assert based test cases in python
+ """
if self.language == "python":
test_code = ""
for test_case in self.test_parameter:
@@ -163,18 +533,6 @@ class TestCode(object):
test_code += tcode + "\n"
return test_code
- if self.language == "c++":
- pass
-
- if self.language == "c":
- pass
-
- if self.language == "java":
- pass
-
- if self.language == "scilab":
- pass
-
def _change_dir(self, in_dir):
if in_dir is not None and isdir(in_dir):
os.chdir(in_dir)
@@ -198,839 +556,13 @@ class CodeServer(object):
"""Calls the TestCode Class to test the current code"""
tc = TestCode(info_parameter, in_dir)
result = tc.run_code()
-
# Put us back into the server pool queue since we are free now.
self.queue.put(self.port)
return json.dumps(result)
- def run_python_code(self, answer, test_parameter, in_dir=None):
- """Tests given Python function (`answer`) with the `test_code`
- supplied. If the optional `in_dir` keyword argument is supplied
- it changes the directory to that directory (it does not change
- it back to the original when done). This function also timesout
- when the function takes more than SERVER_TIMEOUT seconds to run
- to prevent runaway code.
- Returns
- -------
-
- A tuple: (success, error message).
-
- """
- if in_dir is not None and isdir(in_dir):
- os.chdir(in_dir)
-
- # Add a new signal handler for the execution of this code.
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(SERVER_TIMEOUT)
-
- test_parameter = json.loads(test_parameter)
- success = False
- tb = None
-
- test_code = ""
- for test_case in test_parameter:
- pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
- else ""
- kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
- if test_case.get('kw_args') else ""
- args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
- tcode = "assert {0}({1}) == {2}" \
- .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
- test_code += tcode + "\n"
- try:
- submitted = compile(answer, '', mode='exec')
- g = {}
- exec submitted in g
- _tests = compile(test_code, '', mode='exec')
- exec _tests in g
- except TimeoutException:
- err = self.timeout_msg
- except AssertionError:
- type, value, tb = sys.exc_info()
- info = traceback.extract_tb(tb)
- fname, lineno, func, text = info[-1]
- text = str(test_code).splitlines()[lineno-1]
- err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
- except:
- type, value = sys.exc_info()[:2]
- err = "Error: {0}".format(repr(value))
- else:
- success = True
- err = 'Correct answer'
- finally:
- del tb
- # Set back any original signal handler.
- signal.signal(signal.SIGALRM, old_handler)
-
- # Cancel the signal if any, see signal.alarm documentation.
- signal.alarm(0)
-
- # Put us back into the server pool queue since we are free now.
- self.queue.put(self.port)
-
- result = {'success': success, 'error': err}
- return result
-
-# ###############################################################################
-# # `CodeServer` class.
-# ###############################################################################
-# class CodeServer(object):
-# """A code server that executes user submitted test code, tests it and
-# reports if the code was correct or not.
-# """
-# def __init__(self, port, queue):
-# self.port = port
-# self.queue = queue
-# msg = 'Code took more than %s seconds to run. You probably '\
-# 'have an infinite loop in your code.' % SERVER_TIMEOUT
-# self.timeout_msg = msg
-
-# def run_python_code(self, answer, test_parameter, in_dir=None):
-# """Tests given Python function (`answer`) with the `test_code`
-# supplied. If the optional `in_dir` keyword argument is supplied
-# it changes the directory to that directory (it does not change
-# it back to the original when done). This function also timesout
-# when the function takes more than SERVER_TIMEOUT seconds to run
-# to prevent runaway code.
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# test_parameter = json.loads(test_parameter)
-# success = False
-# tb = None
-
-# test_code = ""
-# for test_case in test_parameter:
-# pos_args = ", ".join(str(i) for i in test_case.get('pos_args')) if test_case.get('pos_args') \
-# else ""
-# kw_args = ", ".join(str(k+"="+a) for k, a in test_case.get('kw_args').iteritems()) \
-# if test_case.get('kw_args') else ""
-# args = pos_args + ", " + kw_args if pos_args and kw_args else pos_args or kw_args
-# tcode = "assert {0}({1}) == {2}" \
-# .format(test_case.get('func_name'), args, test_case.get('expected_answer'))
-# test_code += tcode + "\n"
-# try:
-# submitted = compile(answer, '', mode='exec')
-# g = {}
-# exec submitted in g
-# _tests = compile(test_code, '', mode='exec')
-# exec _tests in g
-# except TimeoutException:
-# err = self.timeout_msg
-# except AssertionError:
-# type, value, tb = sys.exc_info()
-# info = traceback.extract_tb(tb)
-# fname, lineno, func, text = info[-1]
-# text = str(test_code).splitlines()[lineno-1]
-# err = "{0} {1} in: {2}".format(type.__name__, str(value), text)
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# else:
-# success = True
-# err = 'Correct answer'
-# finally:
-# del tb
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def run_bash_code(self, answer, test_parameter, in_dir=None):
-# """Tests given Bash code (`answer`) with the `test_code` supplied.
-
-# The testcode should typically contain two lines, the first is a path to
-# the reference script we are to compare against. The second is a path
-# to the arguments to be supplied to the reference and submitted script.
-# The output of these will be compared for correctness.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# def _set_exec(fname):
-# os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
-# | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
-# | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
-# submit_f = open('submit.sh', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-# _set_exec(submit_path)
-
-# # ref path and path to arguments is a comma seperated string obtained
-# #from ref_code_path field in TestCase Mode
-# path_list = test_parameter.get('ref_code_path').split(',')
-# ref_path = path_list[0].strip() if path_list[0] else ""
-# test_case_path = path_list[1].strip() if path_list[1] else ""
-
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-# if not test_case_path.startswith('/'):
-# test_case_path = join(MY_DIR, test_case_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self.check_bash_script(ref_path, submit_path,
-# test_case_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def _run_command(self, cmd_args, *args, **kw):
-# """Run a command in a subprocess while blocking, the process is killed
-# if it takes more than 2 seconds to run. Return the Popen object, the
-# stdout and stderr.
-# """
-# try:
-# proc = subprocess.Popen(cmd_args, *args, **kw)
-# stdout, stderr = proc.communicate()
-# except TimeoutException:
-# # Runaway code, so kill it.
-# proc.kill()
-# # Re-raise exception.
-# raise
-# return proc, stdout, stderr
-
-# def check_bash_script(self, ref_script_path, submit_script_path,
-# test_case_path=None):
-# """ Function validates student script using instructor script as
-# reference. Test cases can optionally be provided. The first argument
-# ref_script_path, is the path to instructor script, it is assumed to
-# have executable permission. The second argument submit_script_path, is
-# the path to the student script, it is assumed to have executable
-# permission. The Third optional argument is the path to test the
-# scripts. Each line in this file is a test case and each test case is
-# passed to the script as standard arguments.
-
-# Returns
-# --------
-
-# returns (True, "Correct answer") : If the student script passes all
-# test cases/have same output, when compared to the instructor script
-
-# returns (False, error_msg): If the student script fails a single
-# test/have dissimilar output, when compared to the instructor script.
-
-# Returns (False, error_msg): If mandatory arguments are not files or if
-# the required permissions are not given to the file(s).
-
-# """
-# if not ref_script_path:
-# return False, "No Test Script path found"
-# if not isfile(ref_script_path):
-# return False, "No file at %s" % ref_script_path
-# if not isfile(submit_script_path):
-# return False, 'No file at %s' % submit_script_path
-# if not os.access(ref_script_path, os.X_OK):
-# return False, 'Script %s is not executable' % ref_script_path
-# if not os.access(submit_script_path, os.X_OK):
-# return False, 'Script %s is not executable' % submit_script_path
-
-# if test_case_path is None or "":
-# ret = self._run_command(ref_script_path, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, inst_stdout, inst_stderr = ret
-# ret = self._run_command(submit_script_path, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdnt_stdout, stdnt_stderr = ret
-# if inst_stdout == stdnt_stdout:
-# return True, 'Correct answer'
-# else:
-# err = "Error: expected %s, got %s" % (inst_stderr,
-# stdnt_stderr)
-# return False, err
-# else:
-# if not isfile(test_case_path):
-# return False, "No test case at %s" % test_case_path
-# if not os.access(ref_script_path, os.R_OK):
-# return False, "Test script %s, not readable" % test_case_path
-# valid_answer = True # We initially make it one, so that we can
-# # stop once a test case fails
-# loop_count = 0 # Loop count has to be greater than or
-# # equal to one.
-# # Useful for caching things like empty
-# # test files,etc.
-# test_cases = open(test_case_path).readlines()
-# num_lines = len(test_cases)
-# for test_case in test_cases:
-# loop_count += 1
-# if valid_answer:
-# args = [ref_script_path] + [x for x in test_case.split()]
-# ret = self._run_command(args, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, inst_stdout, inst_stderr = ret
-# args = [submit_script_path]+[x for x in test_case.split()]
-# ret = self._run_command(args, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdnt_stdout, stdnt_stderr = ret
-# valid_answer = inst_stdout == stdnt_stdout
-# if valid_answer and (num_lines == loop_count):
-# return True, "Correct answer"
-# else:
-# err = "Error:expected %s, got %s" % (inst_stdout+inst_stderr,
-# stdnt_stdout+stdnt_stderr)
-# return False, err
-
-# def run_c_code(self, answer, test_parameter, in_dir=None):
-# """Tests given C code (`answer`) with the `test_code` supplied.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # File extension must be .c
-# submit_f = open('submit.c', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self._check_c_cpp_code(ref_path, submit_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def _compile_command(self, cmd, *args, **kw):
-# """Compiles C/C++/java code and returns errors if any.
-# Run a command in a subprocess while blocking, the process is killed
-# if it takes more than 2 seconds to run. Return the Popen object, the
-# stderr.
-# """
-# try:
-# proc_compile = subprocess.Popen(cmd, shell=True, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# out, err = proc_compile.communicate()
-# except TimeoutException:
-# # Runaway code, so kill it.
-# proc_compile.kill()
-# # Re-raise exception.
-# raise
-# return proc_compile, err
-
-# def _check_c_cpp_code(self, ref_code_path, submit_code_path):
-# """ Function validates student code using instructor code as
-# reference.The first argument ref_code_path, is the path to
-# instructor code, it is assumed to have executable permission.
-# The second argument submit_code_path, is the path to the student
-# code, it is assumed to have executable permission.
-
-# Returns
-# --------
-
-# returns (True, "Correct answer") : If the student function returns
-# expected output when called by reference code.
-
-# returns (False, error_msg): If the student function fails to return
-# expected output when called by reference code.
-
-# Returns (False, error_msg): If mandatory arguments are not files or
-# if the required permissions are not given to the file(s).
-
-# """
-# if not isfile(ref_code_path):
-# return False, "No file at %s" % ref_code_path
-# if not isfile(submit_code_path):
-# return False, 'No file at %s' % submit_code_path
-
-# success = False
-# output_path = os.getcwd() + '/output'
-# compile_command = "g++ %s -c -o %s" % (submit_code_path, output_path)
-# ret = self._compile_command(compile_command)
-# proc, stdnt_stderr = ret
-
-# # Only if compilation is successful, the program is executed
-# # And tested with testcases
-# if stdnt_stderr == '':
-# executable = os.getcwd() + '/executable'
-# compile_main = "g++ %s %s -o %s" % (ref_code_path, output_path,
-# executable)
-# ret = self._compile_command(compile_main)
-# proc, main_err = ret
-# if main_err == '':
-# args = [executable]
-# ret = self._run_command(args, stdin=None,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdout, stderr = ret
-# if proc.returncode == 0:
-# success, err = True, "Correct answer"
-# else:
-# err = stdout + "\n" + stderr
-# os.remove(executable)
-# else:
-# err = "Error:"
-# try:
-# error_lines = main_err.splitlines()
-# for e in error_lines:
-# err = err + "\n" + e.split(":", 1)[1]
-# except:
-# err = err + "\n" + main_err
-# os.remove(output_path)
-# else:
-# err = "Compilation Error:"
-# try:
-# error_lines = stdnt_stderr.splitlines()
-# for e in error_lines:
-# if ':' in e:
-# err = err + "\n" + e.split(":", 1)[1]
-# else:
-# err = err + "\n" + e
-# except:
-# err = err + "\n" + stdnt_stderr
-# return success, err
-
-# def run_cplus_code(self, answer, test_parameter, in_dir=None):
-# """Tests given C++ code (`answer`) with the `test_code` supplied.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # The file extension must be .cpp
-# submit_f = open('submitstd.cpp', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self._check_c_cpp_code(ref_path, submit_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def run_java_code(self, answer, test_parameter, in_dir=None):
-# """Tests given java code (`answer`) with the `test_code` supplied.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# If the optional `in_dir` keyword argument is supplied it changes the
-# directory to that directory (it does not change it back to the original
-# when done).
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # The file extension must be .java
-# # The class name and file name must be same in java
-# submit_f = open('Test.java', 'w')
-# submit_f.write(answer.lstrip())
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# success, err = self._check_java_code(ref_path, submit_path)
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# return success, err
-
-# def _check_java_code(self, ref_code_path, submit_code_path):
-# """ Function validates student code using instructor code as
-# reference.The first argument ref_code_path, is the path to
-# instructor code, it is assumed to have executable permission.
-# The second argument submit_code_path, is the path to the student
-# code, it is assumed to have executable permission.
-
-# Returns
-# --------
-
-# returns (True, "Correct answer") : If the student function returns
-# expected output when called by reference code.
-
-# returns (False, error_msg): If the student function fails to return
-# expected output when called by reference code.
-
-# Returns (False, error_msg): If mandatory arguments are not files or
-# if the required permissions are not given to the file(s).
-
-# """
-# if not isfile(ref_code_path):
-# return False, "No file at %s" % ref_code_path
-# if not isfile(submit_code_path):
-# return False, 'No file at %s' % submit_code_path
-
-# success = False
-# compile_command = "javac %s" % (submit_code_path)
-# ret = self._compile_command(compile_command)
-# proc, stdnt_stderr = ret
-# stdnt_stderr = self._remove_null_substitute_char(stdnt_stderr)
-
-# # Only if compilation is successful, the program is executed
-# # And tested with testcases
-# if stdnt_stderr == '':
-# student_directory = os.getcwd() + '/'
-# student_file_name = "Test"
-# compile_main = "javac %s -classpath %s -d %s" % (ref_code_path,
-# student_directory,
-# student_directory)
-# ret = self._compile_command(compile_main)
-# proc, main_err = ret
-# main_err = self._remove_null_substitute_char(main_err)
-
-# if main_err == '':
-# main_file_name = (ref_code_path.split('/')[-1]).split('.')[0]
-# run_command = "java -cp %s %s" % (student_directory,
-# main_file_name)
-# ret = self._run_command(run_command,
-# stdin=None,
-# shell=True,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdout, stderr = ret
-# if proc.returncode == 0:
-# success, err = True, "Correct answer"
-# else:
-# err = stdout + "\n" + stderr
-# success = False
-# os.remove("%s%s.class" % (student_directory, main_file_name))
-# else:
-# err = "Error:\n"
-# try:
-# error_lines = main_err.splitlines()
-# for e in error_lines:
-# if ':' in e:
-# err = err + "\n" + e.split(":", 1)[1]
-# else:
-# err = err + "\n" + e
-# except:
-# err = err + "\n" + main_err
-# os.remove("%s%s.class" % (student_directory, student_file_name))
-# else:
-# err = "Compilation Error:\n"
-# try:
-# error_lines = stdnt_stderr.splitlines()
-# for e in error_lines:
-# if ':' in e:
-# err = err + "\n" + e.split(":", 1)[1]
-# else:
-# err = err + "\n" + e
-# except:
-# err = err + "\n" + stdnt_stderr
-# result = {'success': success, 'error': err}
-# return result
-
-# def _remove_null_substitute_char(self, string):
-# """Returns a string without any null and substitute characters"""
-# stripped = ""
-# for c in string:
-# if ord(c) is not 26 and ord(c) is not 0:
-# stripped = stripped + c
-# return ''.join(stripped)
-
-# def run_scilab_code(self, answer, test_parameter, in_dir=None):
-# """Tests given Scilab function (`answer`) with the `test_code`
-# supplied. If the optional `in_dir` keyword argument is supplied
-# it changes the directory to that directory (it does not change
-# it back to the original when done). This function also timesout
-# when the function takes more than SERVER_TIMEOUT seconds to run
-# to prevent runaway code.
-
-# The testcode is a path to the reference code.
-# The reference code will call the function submitted by the student.
-# The reference code will check for the expected output.
-
-# If the path's start with a "/" then we assume they are absolute paths.
-# If not, we assume they are relative paths w.r.t. the location of this
-# code_server script.
-
-# Returns
-# -------
-
-# A tuple: (success, error message).
-
-# """
-# if in_dir is not None and isdir(in_dir):
-# os.chdir(in_dir)
-
-# # Removes all the commands that terminates scilab
-# answer,i = self._remove_scilab_exit(answer.lstrip())
-
-# # Throw message if there are commmands that terminates scilab
-# add_err=""
-# if i > 0:
-# add_err = "Please do not use exit, quit and abort commands in your\
-# code.\n Otherwise your code will not be evaluated\
-# correctly.\n"
-
-# # The file extension should be .sci
-# submit_f = open('function.sci','w')
-# submit_f.write(answer)
-# submit_f.close()
-# submit_path = abspath(submit_f.name)
-
-# ref_path = test_parameter.get('ref_code_path').strip()
-# if not ref_path.startswith('/'):
-# ref_path = join(MY_DIR, ref_path)
-
-# # Add a new signal handler for the execution of this code.
-# old_handler = signal.signal(signal.SIGALRM, timeout_handler)
-# signal.alarm(SERVER_TIMEOUT)
-
-# # Do whatever testing needed.
-# success = False
-# try:
-# cmd = 'printf "lines(0)\nexec(\'{0}\',2);\nquit();"'.format(ref_path)
-# cmd += ' | timeout 8 scilab-cli -nb'
-# ret = self._run_command(cmd,
-# shell=True,
-# stdout=subprocess.PIPE,
-# stderr=subprocess.PIPE)
-# proc, stdout, stderr = ret
-
-# # Get only the error.
-# stderr = self._get_error(stdout)
-# if stderr is None:
-# # Clean output
-# stdout = self._strip_output(stdout)
-# if proc.returncode == 5:
-# success, err = True, "Correct answer"
-# else:
-# err = add_err + stdout
-# else:
-# err = add_err + stderr
-# except TimeoutException:
-# err = self.timeout_msg
-# except:
-# type, value = sys.exc_info()[:2]
-# err = "Error: {0}".format(repr(value))
-# finally:
-# # Set back any original signal handler.
-# signal.signal(signal.SIGALRM, old_handler)
-
-# # Delete the created file.
-# os.remove(submit_path)
-
-# # Cancel the signal if any, see signal.alarm documentation.
-# signal.alarm(0)
-
-# # Put us back into the server pool queue since we are free now.
-# self.queue.put(self.port)
-
-# result = {'success': success, 'error': err}
-# return result
-
-# def _remove_scilab_exit(self, string):
-# """
-# Removes exit, quit and abort from the scilab code
-# """
-# new_string = ""
-# i=0
-# for line in string.splitlines():
-# new_line = re.sub(r"exit.*$","",line)
-# new_line = re.sub(r"quit.*$","",new_line)
-# new_line = re.sub(r"abort.*$","",new_line)
-# if line != new_line:
-# i=i+1
-# new_string = new_string +'\n'+ new_line
-# return new_string, i
-
- def _get_error(self, string):
- """
- Fetches only the error from the string.
- Returns None if no error.
- """
- obj = re.search("!.+\n.+",string);
- if obj:
- return obj.group()
- return None
-
- def _strip_output(self, out):
- """
- Cleans whitespace from the output
- """
- strip_out = "Message"
- for l in out.split('\n'):
- if l.strip():
- strip_out = strip_out+"\n"+l.strip()
- return strip_out
-
def run(self):
- """Run XMLRPC server, serving our methods.
- """
+ """Run XMLRPC server, serving our methods."""
server = SimpleXMLRPCServer(("localhost", self.port))
self.server = server
server.register_instance(self)
--
cgit
From 28ba37e907553aeac3841e221853683b9171f0db Mon Sep 17 00:00:00 2001
From: ankitjavalkar
Date: Wed, 18 Mar 2015 12:16:20 +0530
Subject: Changes to Question model, Views, Add-Question UI - ref_code_path is
now part of Question model - MCQ/MCC answers checked using solution field in
question model - Formset should reload even after errors - add_question
page chould display solution field only in MCQ/MCC
---
testapp/exam/forms.py | 23 ++++++++++++-----------
testapp/exam/models.py | 12 ++++++++----
testapp/exam/static/exam/js/add_question.js | 15 ++++++++++++---
testapp/exam/templates/exam/add_question.html | 9 +++++----
testapp/exam/templates/exam/edit_question.html | 24 ------------------------
testapp/exam/views.py | 14 ++++++++------
6 files changed, 45 insertions(+), 52 deletions(-)
(limited to 'testapp/exam')
diff --git a/testapp/exam/forms.py b/testapp/exam/forms.py
index e3cef40..93584a6 100644
--- a/testapp/exam/forms.py
+++ b/testapp/exam/forms.py
@@ -187,8 +187,8 @@ class QuestionForm(forms.ModelForm):
description = forms.CharField(widget=forms.Textarea\
(attrs={'cols': 40, 'rows': 1}))
points = forms.FloatField()
- # test = forms.CharField(widget=forms.Textarea\
- # (attrs={'cols': 40, 'rows': 1}), required=False)
+ solution = forms.CharField(widget=forms.Textarea\
+ (attrs={'cols': 40, 'rows': 1}), required=False)
options = forms.CharField(widget=forms.Textarea\
(attrs={'cols': 40, 'rows': 1}), required=False)
language = forms.CharField(max_length=20, widget=forms.Select\
@@ -199,17 +199,18 @@ class QuestionForm(forms.ModelForm):
tags = TagField(widget=TagAutocomplete(), required=False)
snippet = forms.CharField(widget=forms.Textarea\
(attrs={'cols': 40, 'rows': 1}), required=False)
+ ref_code_path = forms.CharField(widget=forms.Textarea\
+ (attrs={'cols': 40, 'rows': 1}), required=False)
def save(self, commit=True):
- summary = self.cleaned_data["summary"]
- description = self.cleaned_data["description"]
- points = self.cleaned_data['points']
- # test = self.cleaned_data["test"]
- options = self.cleaned_data['options']
- language = self.cleaned_data['language']
- type = self.cleaned_data["type"]
- active = self.cleaned_data["active"]
- snippet = self.cleaned_data["snippet"]
+ summary = self.cleaned_data.get("summary")
+ description = self.cleaned_data.get("description")
+ points = self.cleaned_data.get("points")
+ options = self.cleaned_data.get("options")
+ language = self.cleaned_data.get("language")
+ type = self.cleaned_data.get("type")
+ active = self.cleaned_data.get("active")
+ snippet = self.cleaned_data.get("snippet")
new_question = Question()
new_question.summary = summary
diff --git a/testapp/exam/models.py b/testapp/exam/models.py
index dd0c806..5989be4 100644
--- a/testapp/exam/models.py
+++ b/testapp/exam/models.py
@@ -60,8 +60,11 @@ class Question(models.Model):
# Number of points for the question.
points = models.FloatField(default=1.0)
- # # Test cases for the question in the form of code that is run.
- # test = models.TextField(blank=True)
+ # Answer for MCQs.
+ solution = models.TextField(blank=True)
+
+ # Test cases file paths
+ ref_code_path = models.TextField(blank=True)
# Any multiple choice options. Place one option per line.
options = models.TextField(blank=True)
@@ -114,6 +117,7 @@ class Question(models.Model):
info_parameter['id'] = self.id
info_parameter['user_answer'] = user_answer
info_parameter['test_parameter'] = test_case_parameter
+ info_parameter['ref_code_path'] = self.ref_code_path
return json.dumps(info_parameter)
@@ -449,5 +453,5 @@ class TestCase(models.Model):
# Test case Expected answer in list form
expected_answer = models.TextField(blank=True, null = True)
- # Test case path to system test code applicable for CPP, C, Java and Scilab
- ref_code_path = models.TextField(blank=True, null = True)
+ # # Test case path to system test code applicable for CPP, C, Java and Scilab
+ # ref_code_path = models.TextField(blank=True, null = True)
diff --git a/testapp/exam/static/exam/js/add_question.js b/testapp/exam/static/exam/js/add_question.js
index 267cdb2..5a94f4c 100644
--- a/testapp/exam/static/exam/js/add_question.js
+++ b/testapp/exam/static/exam/js/add_question.js
@@ -153,13 +153,17 @@ function textareaformat()
if(value == 'mcq' || value == 'mcc')
{
document.getElementById('id_options').style.visibility='visible';
- document.getElementById('label_option').innerHTML="Options :"
+ document.getElementById('label_option').innerHTML="Options :";
+ document.getElementById('id_solution').style.visibility='visible';
+ document.getElementById('label_solution').innerHTML="Solutions :";
}
else
{
document.getElementById('id_options').style.visibility='hidden';
document.getElementById('label_option').innerHTML = "";
+ document.getElementById('id_solution').style.visibility='hidden';
+ document.getElementById('label_solution').innerHTML=""
}
});
document.getElementById('my').innerHTML = document.getElementById('id_description').value ;
@@ -168,12 +172,16 @@ function textareaformat()
{
document.getElementById('id_options').style.visibility='visible';
document.getElementById('label_option').innerHTML="Options :"
+ document.getElementById('id_solution').style.visibility='visible';
+ document.getElementById('label_solution').innerHTML="Solutions :";
}
else
{
document.getElementById('id_options').style.visibility='hidden';
document.getElementById('label_option').innerHTML = "";
+ document.getElementById('id_solution').style.visibility='hidden';
+ document.getElementById('label_solution').innerHTML=""
}
}
@@ -189,8 +197,9 @@ function autosubmit()
if(type.value == 'select')
{
type.style.border = 'solid red';
- return false;
- }
+ return false;
+ }
+
if (type.value == 'mcq' || type.value == 'mcc')
{
diff --git a/testapp/exam/templates/exam/add_question.html b/testapp/exam/templates/exam/add_question.html
index e5e2574..e117ef3 100644
--- a/testapp/exam/templates/exam/add_question.html
+++ b/testapp/exam/templates/exam/add_question.html
@@ -27,11 +27,12 @@