diff options
author | adityacp | 2021-01-25 09:26:34 +0530 |
---|---|---|
committer | adityacp | 2021-01-25 09:26:34 +0530 |
commit | 419c66d29f20123d6f1e04072829973d84ff40c5 (patch) | |
tree | 931a14599fe7a994edfe646c4ee622ff7949fabd /upload | |
parent | 93860c0fb3c61d64878ff73c3c28ec3fd4e1ad7a (diff) | |
parent | 9b9ebb227bbaafca3daf5485c3bbe0d948d3e843 (diff) | |
download | online_test-419c66d29f20123d6f1e04072829973d84ff40c5.tar.gz online_test-419c66d29f20123d6f1e04072829973d84ff40c5.tar.bz2 online_test-419c66d29f20123d6f1e04072829973d84ff40c5.zip |
Merge branch 'master' of https://github.com/FOSSEE/online_test into code_refactors
Diffstat (limited to 'upload')
-rw-r--r-- | upload/models.py | 0 | ||||
-rw-r--r-- | upload/urls.py | 9 | ||||
-rw-r--r-- | upload/utils.py | 551 | ||||
-rw-r--r-- | upload/views.py | 62 |
4 files changed, 622 insertions, 0 deletions
diff --git a/upload/models.py b/upload/models.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/upload/models.py diff --git a/upload/urls.py b/upload/urls.py new file mode 100644 index 0000000..207b9ea --- /dev/null +++ b/upload/urls.py @@ -0,0 +1,9 @@ +from django.conf.urls import url +from upload import views + +app_name = 'upload' + +urlpatterns = [ + url(r'^download_course_md/(?P<course_id>\d+)/$', + views.download_course_md, name="download_course_md"), +]
\ No newline at end of file diff --git a/upload/utils.py b/upload/utils.py new file mode 100644 index 0000000..cae9f1f --- /dev/null +++ b/upload/utils.py @@ -0,0 +1,551 @@ +import io +import re +import os +import json +import yaml +import requests +import more_itertools +import ruamel.yaml + +from django.core.exceptions import ObjectDoesNotExist + +from yaksh.models import Lesson, Course, LearningUnit, LearningModule, Quiz, TableOfContents + +_HEADER_RE = re.compile(r"^---\s*$") +_BLANK_RE = re.compile(r"^\s*$") +_JUPYTER_RE = re.compile(r"^meta\s*:\s*$") +_LEFTSPACE_RE = re.compile(r"^\s") +DATA_SEP = '~#~#~#' + + +def recursive_update(target, update): + """ Update recursively a (nested) dictionary with the content of another. + Inspired from https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth + """ + for key in update: + value = update[key] + if value is None: + del target[key] + elif isinstance(value, dict): + target[key] = recursive_update(target.get(key, {}), value) + else: + target[key] = value + return target + +def _metadata_to_dict(lines): + metadata= {} + ended = False + injupyter = False + # jupyter = [] + meta_lines = [] + + for i, line in enumerate(lines): + if i == 0 and _HEADER_RE.match(line): + continue + + if (i > 0) and _HEADER_RE.match(line): + ended = True + + if _JUPYTER_RE.match(line): + injupyter = True + elif line and not _LEFTSPACE_RE.match(line): + injupyter = False + + if injupyter: + meta_lines.append(line) + + if ended: + if meta_lines: + recursive_update(metadata, yaml.safe_load("\n".join(meta_lines))) + return metadata + +def _display_content_to_dict(lines): + ended = False + display_content = [] + + for i, line in enumerate(lines): + if (i > 0) and _HEADER_RE.match(line): + ended = True + continue + + if ended: + display_content.append(line) + + return ''.join(display_content) + + +class LessonData(): + type = "lesson" + display_content_field = 'description' + def __init__(self, obj, order, course_id): + toc_file = TableOfContents.objects.get_all_tocs_as_yaml( + course_id, obj.id, '{0}_lesson_toc.yaml'.format(obj.id) + ) + self.data = { + "id": obj.id, + "name": obj.name, + "active": obj.active, + "order": order, + } + if toc_file: + self.data.update( + {"toc": toc_file,} + ) + self.display_content = obj.description if obj.description else None + + +class QuizData(): + type = "quiz" + display_content_field = 'instructions' + def __init__(self, obj, order): + self.data = { + "id": obj.id, + "description": obj.description, + "active": obj.active, + "start_date_time": obj.start_date_time, + "end_date_time": obj.end_date_time, + "duration": obj.duration, + "pass_criteria": obj.pass_criteria, + "attempts_allowed": obj.attempts_allowed, + "allow_skip": obj.allow_skip, + "view_answerpaper": obj.view_answerpaper, + "order": order, + } + self.display_content = obj.instructions if obj.instructions else None + + +class UnitData(): + + @classmethod + def get_lesson_or_quiz_data(cls, units, course_id): + return [ + LessonData(unit_obj.lesson, unit_obj.order, course_id) + if unit_obj.lesson else QuizData(unit_obj.quiz, unit_obj.order) + for unit_obj in units + ] + + @classmethod + def md_to_dict(cls, lines): + all_unit_data = list( + more_itertools.split_at( + lines, + lambda x: x.strip('\n') == DATA_SEP + ) + ) + learning_units = [] + module_data = None + for unit_lines in all_unit_data: + if unit_lines: + metadata = _metadata_to_dict(unit_lines) + display_content = _display_content_to_dict(unit_lines) + clean_display_content = display_content.strip('\n') + try: + unit_data = metadata.get('meta', {}).get('data', {}) + order = unit_data.pop('order', 0) + unit_type = metadata.get('meta', {}).get('type', None) + unit_cls = LessonData if unit_type == 'lesson' else QuizData + except KeyError: + print("[ERROR] Data not found. Please check the MD file") + + unit_data.update( + {getattr(unit_cls,'display_content_field'): clean_display_content,} + ) + if unit_type == 'lesson': + learning_unit_data = { + "order": order, + "type": unit_type, + "quiz": None, + "lesson": unit_data, + } + learning_units.append(learning_unit_data) + elif unit_type == 'quiz': + learning_unit_data = { + "order": order, + "type": unit_type, + "quiz": unit_data, + "lesson": None, + } + learning_units.append(learning_unit_data) + elif unit_type == 'module': + module_data = ModuleData.md_to_dict(unit_lines) + + return {'module': module_data, 'learning_units': learning_units} + + @classmethod + def md_to_dict_from_file(cls, file): + with open(file, 'r') as f: + lines = f.readlines() + return cls.md_to_dict(lines) + + +class ModuleData(): + type = "module" + display_content_field = 'description' + def __init__(self, obj, course_id): + self.data = { + "id": obj.id, + "name": obj.name, + "order": obj.order, + "active":obj.active, + } + self.display_content = obj.description if obj.description else None + self.units = self.set_lessons(obj.get_learning_units(), course_id) + + def set_lessons(self, units, course_id): + return UnitData.get_lesson_or_quiz_data(units, course_id) + + + @classmethod + def md_to_dict(cls, lines): + metadata = _metadata_to_dict(lines) + display_content = _display_content_to_dict(lines) + clean_display_content = display_content.strip('\n') + try: + data = metadata.get('meta', {}).get('data', {}) + except KeyError: + print("[ERROR] Data not found. Please check the MD file") + + data.update({cls.display_content_field: clean_display_content}) + return data + + @classmethod + def md_to_dict_from_file(cls, file): + with open(file, 'r') as f: + lines = f.readlines() + return cls.md_to_dict(lines) + + +class CourseData(): + type = "course" + display_content_field = "instructions" + def __init__(self, obj): + self.data = { + "id": obj.id, + "name": obj.name, + "enrollment": obj.enrollment, + "active": obj.active, + "start_enroll_time": obj.start_enroll_time, + "end_enroll_time": obj.end_enroll_time, + } + self.display_content = obj.instructions if obj.instructions else None + self.modules = self.set_modules(obj.get_learning_modules(), obj.id) + + def set_modules(self, modules, course_id): + return [ModuleData(module_obj, course_id) for module_obj in modules] + + @classmethod + def md_to_dict(cls, file): + with open(file, 'r') as f: + lines = f.readlines() + metadata = _metadata_to_dict(lines) + display_content = _display_content_to_dict(lines) + clean_display_content = display_content.strip('\n') + try: + data = metadata.get('meta', {}).get('data', {}) + except KeyError: + print("[ERROR] Data not found. Please check the MD file") + + data.update({cls.display_content_field: clean_display_content}) + return data + + +def create_header(data, dtype): + header = [] + metadata = { + "type": dtype, + "data": data + } + yaml=ruamel.yaml.YAML() + yaml.default_flow_style = False + io_obj = io.StringIO() + + yaml.dump({"meta": metadata}, io_obj) + + if metadata['data'].get('id', None): + raw_yaml_data = yaml.load(io_obj.getvalue()) + raw_yaml_data['meta']['data'].yaml_add_eol_comment('Do Not Change This Value', 'id') + io_obj = io.StringIO() + yaml.dump(raw_yaml_data, io_obj) + + header.extend(io_obj.getvalue().splitlines()) + header = ["---"] + header + ["---"] + + return header + + +def get_course_data(course_id): + course_obj = Course.objects.get(id=course_id) + return CourseData(course_obj) + + +def _create_clean_file_name(name, ext): + clean_name = re.sub(r'[^\w\s-]', '', name).strip().lower() + return re.sub(r'[-\s]+', '_', clean_name) + '.' + ext + +def _get_file_name_from_object(content_object, ext): + name = ' '.join( + [content_object.type, content_object.data.get('name')] + ) + return _create_clean_file_name(name, ext) + + + +def create_md(content_object, file_name=None, multiple_obj=False): + file_content = [] + if not file_name: + dest_file_name = _get_file_name_from_object(content_object, 'md') + else: + dest_file_name = file_name + data = content_object.data + dtype = content_object.type + seperator = '\n{0}\n'.format(DATA_SEP) if multiple_obj else '' + header = create_header(data, dtype) + if content_object.display_content: + file_content.append(content_object.display_content) + + file_content = ( + '\n'.join(header) + '\n\n' + '\n'.join(file_content) + + seperator + ) + + with open(dest_file_name, 'a+') as f: + f.write(file_content) + + return dest_file_name + + +def write_course_to_file(course_id): + # Create the course md file + course = get_course_data(course_id) + course_file = create_md(course) + + course_map = {'course': course_file, 'modules': []} + + # Create the modules and lessons md files + for module in course.modules: + mod_file = create_md(module, multiple_obj=True) + for lesson in module.units: + create_md(lesson, mod_file, multiple_obj=True) + + course_map['modules'].append( + {'file': mod_file,} + ) + + with open('toc.yml', 'w') as f: + f.write(yaml.safe_dump(course_map)) + + +def convert_md_to_dict(toc, user): + course_file = toc.get('course') + course_data = CourseData.md_to_dict(course_file) + module_obj_list = [] + for module in toc.get('modules', []): + module_file = module.get('file') + module_data = UnitData.md_to_dict_from_file( + module_file + ).get('module') + module_id = module_data.get('id', None) + if module_id: + mod_created = False + module_obj = LearningModule.objects.get(id=module_id) + module_obj.__dict__.update(module_data) + module_obj.save() + + else: + mod_created = True + module_data.update({'creator': user}) + module_obj = LearningModule.objects.create( + **module_data + ) + + unit_file = module.get('units', None) + unit_list = UnitData.md_to_dict_from_file(module_file).get('learning_units') + unit_obj_list = [] + for unit in unit_list: + unit_type = unit.get('type') + lesson_or_quiz_obj = None + if unit_type == 'lesson': + lq_data = unit.pop('lesson') + lq_data.update({'creator': user}) + if lq_data.get('id'): # Lesson already exists + lesson_or_quiz_obj = Lesson.objects.get(id=lq_data.get('id')) + lesson_or_quiz_obj.__dict__.update(lq_data) + lesson_or_quiz_obj.save() + + toc_file = lq_data.get('toc', None) + if toc_file: + with open(toc_file, 'r') as tocf: + toc_data = ruamel.yaml.safe_load_all(tocf.read()) + results = TableOfContents.objects.add_contents( + course_data.get('id'), lesson_or_quiz_obj.id , user, toc_data) + for status, msg in results: + if status == False: + raise Exception(msg) + else: + lesson_or_quiz_obj = Lesson.objects.create( + **lq_data, + ) + unit['lesson'] = lesson_or_quiz_obj + + toc_file = lq_data.get('toc', None) + if toc_file: + with open(toc_file, 'r') as tocf: + toc_data = ruamel.yaml.safe_load_all(tocf.read()) + results = TableOfContents.objects.add_contents( + course_data.get('id'), lesson_or_quiz_obj.id , user, toc_data) + for status, msg in results: + if status == False: + raise Exception(msg) + else: + lq_data = unit.pop('quiz') + lq_data.update({'creator': user}) + if lq_data.get('id'): # Quiz already exists + lesson_or_quiz_obj = Quiz.objects.get(id=lq_data.get('id')) + lesson_or_quiz_obj.__dict__.update(lq_data) + lesson_or_quiz_obj.save() + else: + lesson_or_quiz_obj = Quiz.objects.create( + **lq_data, + ) + unit['quiz'] = lesson_or_quiz_obj + + if not lq_data.get('id'): + unit_obj, unit_created = LearningUnit.objects.create( + **unit + ), True + else: + lesson_or_quiz_class_map = { + 'lesson': Lesson, + 'quiz': Quiz, + } + unit_created = False + lq_obj_id = lq_data.get('id') + lq_obj = lesson_or_quiz_class_map.get(unit_type).objects.get(id=lq_obj_id) + m_units = module_obj.learning_unit.values_list('id', flat=True) + lq_units = lq_obj.learningunit_set.values_list('id', flat=True) + unit_id = list(set(m_units) & set(lq_units))[0] + + unit_obj = LearningUnit.objects.get(id=unit_id) + unit_obj.__dict__.update({'order': unit.get('order', 0)}) + unit_obj.save() + + if unit_created: + unit_obj_list.append(unit_obj) + + module_obj.learning_unit.add(*unit_obj_list) + if mod_created: + module_obj_list.append(module_obj) + + if course_data.get('id', None): + course_obj = Course.objects.get(id=course_data.get('id')) + course_obj.__dict__.update(course_data) + course_obj.save() + else: + course_data.update({'creator': user}) + course_obj, course_created = Course.objects.create( + **course_data + ), True + + course_obj.learning_module.add(*module_obj_list) + + return course_obj + + +def check_data(toc): + course_file = toc.get('course') + course_data = CourseData.md_to_dict(course_file) + course_id = course_data.get('id') + module_id_list = [] + for data_elem in toc.get('modules', []): + _file = data_elem.get('file') + _data = UnitData.md_to_dict_from_file(_file) + + module_id = _data.get('module', None).get('id', None) + if module_id: + module_id_list.append(module_id) + + lesson_id_list = [] + quiz_id_list = [] + for unit in _data.get('learning_units'): + unit_id = unit.get('id', None) + unit_type = unit.get('type', None) + if unit_id: + if unit_type == 'lesson': + lesson_id_list.append(unit_id) + else: + quiz_id_list.append(unit_id) + + try: + if not has_relationship(module_id, 'learning_module', lesson_id_list, unit_type): + msg = "Lesson IDs used in metadata do not belong to current course, Kindly inspect and reupload" + return False, msg + if not has_relationship(module_id, 'learning_module', quiz_id_list, unit_type): + msg = "Quiz IDs used in metadata do not belong to current course, Kindly inspect and reupload" + return False, msg + except ObjectDoesNotExist: + msg = "Object does not exist in DB" + return False, msg + + try: + if not has_relationship(course_id, 'course', module_id_list): + msg = "Module IDs used in metadata do not belong to current course, Kindly inspect and reupload" + return False, msg + + if has_duplicate_id(course_id, 'course', module_id_list): + msg = "Modules metadata contains duplicate IDs, Kindly inspect and reupload" + return False, msg + except ObjectDoesNotExist: + msg = "Object does not exist in DB" + return False, msg + return True, 'File check successful' + + +def get_parent_child_data_from_db(parent_id, parent_type, child_id_list, child_type=None): + if parent_type == 'learning_module': + mod_obj = LearningModule.objects.get(id=parent_id) + relationship_id_list = [ e for e in + mod_obj.learning_unit.order_by( + "order" + ).values_list( + child_type + '__id', flat=True, + ) if e != None + ] + return relationship_id_list + + elif parent_type == 'course': + course_obj = Course.objects.get(id=parent_id) + relationship_id_list = course_obj.get_learning_modules().values_list( + 'id', flat=True, + ) + return relationship_id_list + +def has_duplicate_id(parent_id, parent_type, child_id_list, child_type=None): + relationship_id_list = get_parent_child_data_from_db( + parent_id, parent_type, child_id_list, child_type + ) + if len(child_id_list) != len(set(relationship_id_list)): + return True # duplicates exist + else: + return False # duplicates do not exist + +def has_relationship(parent_id, parent_type, child_id_list, child_type=None): + relationship_id_list = get_parent_child_data_from_db( + parent_id, parent_type, child_id_list, child_type + ) + + for _id in child_id_list: + if _id not in relationship_id_list: + return False + return True + +def read_toc(file): + with open(file, 'r') as f: + toc = yaml.load(f, Loader=yaml.FullLoader) + return toc + +def upload_course(user): + toc = read_toc('toc.yml') + status, msg = check_data(toc) + course_data = convert_md_to_dict(toc, user) + return status, msg + diff --git a/upload/views.py b/upload/views.py new file mode 100644 index 0000000..fb80c07 --- /dev/null +++ b/upload/views.py @@ -0,0 +1,62 @@ +import tempfile +import os +from zipfile import ZipFile +from io import BytesIO as string_io + +from django.http import HttpResponse +from django.shortcuts import render +from django.contrib.auth.decorators import login_required +from django.contrib import messages + +from upload.utils import upload_course, write_course_to_file + + +def upload_course_md(request): + if request.method == 'POST': + status = False + msg = None + user = request.user + course_upload_file = request.FILES.get('course_upload_md') + file_extension = os.path.splitext(course_upload_file.name)[1][1:] + if file_extension not in ['zip']: + messages.warning( + request, "Please upload zip file" + ) + else: + curr_dir = os.getcwd() + try: + with tempfile.TemporaryDirectory() as tmpdirname, ZipFile(course_upload_file, 'r') as zip_file: + zip_file.extractall(tmpdirname) + os.chdir(tmpdirname) + status, msg = upload_course(user) + except Exception as e: + import traceback + traceback.print_exc() + messages.warning(request, f"Error parsing file structure: {e}") + finally: + os.chdir(curr_dir) + + return status, msg + +def download_course_md(request, course_id): + curr_dir = os.getcwd() + zip_file_name = string_io() + try: + with tempfile.TemporaryDirectory() as tmpdirname, ZipFile(zip_file_name, 'w') as zip_file: + os.chdir(tmpdirname) + write_course_to_file(course_id) + + for foldername, subfolders, filenames in os.walk(tmpdirname): + for filename in filenames: + zip_file.write(os.path.join(filename)) + except Exception as e: + messages.warning(request, f"Error while downloading file: {e}") + finally: + os.chdir(curr_dir) + + zip_file_name.seek(0) + response = HttpResponse(content_type='application/zip') + response['Content-Disposition'] = 'attachment; filename=course.zip' + response.write(zip_file_name.read()) + + return response
\ No newline at end of file |