diff options
author | Sunil Shetye | 2021-12-02 00:19:09 +0530 |
---|---|---|
committer | Sunil Shetye | 2021-12-02 00:19:26 +0530 |
commit | 8032b8624a15c405f98a35f3717cab34db889b21 (patch) | |
tree | 7b9b482a95cd4d8dd385fb95bb79598dc1bc2bba | |
parent | f55183c54d08ffce07ce08606d28cbaef831ea10 (diff) | |
download | Common-Interface-Project-8032b8624a15c405f98a35f3717cab34db889b21.tar.gz Common-Interface-Project-8032b8624a15c405f98a35f3717cab34db889b21.tar.bz2 Common-Interface-Project-8032b8624a15c405f98a35f3717cab34db889b21.zip |
add STREAM as a celery state
-rw-r--r-- | blocks/simulationAPI/helpers/ngspice_helper.py | 2 | ||||
-rw-r--r-- | blocks/simulationAPI/models.py | 1 | ||||
-rw-r--r-- | blocks/simulationAPI/serializers.py | 2 | ||||
-rw-r--r-- | blocks/simulationAPI/tasks.py | 5 | ||||
-rw-r--r-- | blocks/simulationAPI/urls.py | 3 | ||||
-rw-r--r-- | blocks/simulationAPI/views.py | 181 |
6 files changed, 190 insertions, 4 deletions
diff --git a/blocks/simulationAPI/helpers/ngspice_helper.py b/blocks/simulationAPI/helpers/ngspice_helper.py index 6e9171e2..12e68cb7 100644 --- a/blocks/simulationAPI/helpers/ngspice_helper.py +++ b/blocks/simulationAPI/helpers/ngspice_helper.py @@ -102,7 +102,7 @@ def ExecXml(filepath, file_id, parameters): logger.info('err=%s', err) log_name = '/tmp/blocks-tmp/scilab-log.txt' # FIXME: remove this line - return ('Success', log_name) + return ('Success', log_name, proc.returncode) except BaseException as e: logger.exception('Encountered Exception:') logger.info('removing %s', filepath) diff --git a/blocks/simulationAPI/models.py b/blocks/simulationAPI/models.py index 35bf8157..2124fd47 100644 --- a/blocks/simulationAPI/models.py +++ b/blocks/simulationAPI/models.py @@ -26,6 +26,7 @@ class TaskFile(models.Model): parameters = models.TextField(blank=True, null=True) upload_time = models.DateTimeField(auto_now=True) log_name = models.CharField(max_length=500, blank=True, null=True) + returncode = models.IntegerField(blank=True, null=True) task = models.OneToOneField(Task, on_delete=models.CASCADE, related_name='file') diff --git a/blocks/simulationAPI/serializers.py b/blocks/simulationAPI/serializers.py index b3d22120..ee7da0fe 100644 --- a/blocks/simulationAPI/serializers.py +++ b/blocks/simulationAPI/serializers.py @@ -10,7 +10,7 @@ class TaskFileSerializer(serializers.ModelSerializer): class Meta: model = TaskFile fields = ('file_id', 'file', 'app_name', 'parameters', 'upload_time', - 'log_name', 'task') + 'log_name', 'returncode', 'task') class TaskSerializer(serializers.HyperlinkedModelSerializer): diff --git a/blocks/simulationAPI/tasks.py b/blocks/simulationAPI/tasks.py index bb95a0ea..f3c4891c 100644 --- a/blocks/simulationAPI/tasks.py +++ b/blocks/simulationAPI/tasks.py @@ -30,10 +30,11 @@ def process_task(task_id): output = ngspice_helper.ExecXml(file_path, file_id, parameters) if output[0] == "Success": file_obj.log_name = output[1] + file_obj.returncode = output[2] file_obj.save() current_task.update_state( - state='PROGRESS', - meta={'current_process': 'Processed Xml, Loading Output'}) + state='STREAM', + meta={'current_process': 'Processed Xml, Streaming Output'}) return output[0] except Exception as e: diff --git a/blocks/simulationAPI/urls.py b/blocks/simulationAPI/urls.py index eab40590..f03380d1 100644 --- a/blocks/simulationAPI/urls.py +++ b/blocks/simulationAPI/urls.py @@ -8,4 +8,7 @@ urlpatterns = [ path('status/<uuid:task_id>', simulationAPI_views.CeleryResultView.as_view(), name='celery_status'), + + path('stream/<uuid:task_id>', + simulationAPI_views.StreamView.as_view(), name='stream_status'), ] diff --git a/blocks/simulationAPI/views.py b/blocks/simulationAPI/views.py index 626c6175..f1056bf3 100644 --- a/blocks/simulationAPI/views.py +++ b/blocks/simulationAPI/views.py @@ -1,3 +1,4 @@ +from simulationAPI.models import TaskFile from simulationAPI.serializers import TaskSerializer from simulationAPI.tasks import process_task from rest_framework import status @@ -7,10 +8,26 @@ from rest_framework.permissions import AllowAny from rest_framework.views import APIView from rest_framework.response import Response from celery.result import AsyncResult +import os import uuid import logging +import time +SCILAB_INSTANCE_TIMEOUT_INTERVAL = 300 +MAX_LOG_SIZE = 512 * 1024 +LOOK_DELAY = 0.1 + +# States of the line +# to indicate initialization of block in log file is encountered +INITIALIZATION = 0 +# to indicate ending of log file data for that block is encountered +ENDING = 1 +# to indicate data is proper and can be read +DATA = 2 +# to indicate there is no line in log file further +NOLINE = -1 + logger = logging.getLogger(__name__) @@ -62,3 +79,167 @@ class CeleryResultView(APIView): 'details': celery_result.info } return Response(response_data) + + +def parse_line(line, lineno): + ''' + Function to parse the line + Returns tuple of figure ID and state + state = INITIALIZATION if new figure is created + ENDING if current fig end + DATA otherwise + ''' + line_words = line.split(' ') # Each line is split to read condition + try: + # The below condition determines the block ID + if line_words[0] == "Initialization": + # New figure created + # Get fig id + # to extract figure ids (sometime multiple sinks can be used in one + # diagram to differentiate that) + figure_id = line_words[-1] + return (figure_id, INITIALIZATION) + elif line_words[0] == "Ending": + # Current figure end + # Get fig id + figure_id = line_words[-1] + return (figure_id, ENDING) + else: + # Current figure coordinates + figure_id = line_words[2] + return (figure_id, DATA) + except Exception as e: + logger.error('%s while parsing %s on line %s', str(e), line, lineno) + return (None, NOLINE) + + + +def get_line_and_state(file, figure_list, lineno, incomplete_line): + ''' + Function to get a new line from file + This also parses the line and appends new figures to figure List + ''' + line = file.readline() # read line by line from log + if not line: # if line is empty then return noline + return (incomplete_line, NOLINE) + if incomplete_line is not None: + line = incomplete_line + line + if '\n' not in line: + return (line, NOLINE) + # every line is passed to function parse_line for getting values + line = line.rstrip() + parse_result = parse_line(line, lineno) + figure_id = parse_result[0] + state = parse_result[1] + if state == INITIALIZATION: + # New figure created + # Add figure ID to list + figure_list.append(figure_id) # figure id of block is added to list + return (None, INITIALIZATION) + elif state == ENDING: + # End of figure + # Remove figure ID from list + # Once ending of log file/data is encountered for that block, figure id + # will be removed + figure_list.remove(figure_id) + return (None, ENDING) + elif state == NOLINE: + return (None, NOLINE) + return (line, DATA) + + +class StreamView(APIView): + """ + + Streams Simulation results for 'task_id' provided after + uploading the xml + /api/stream/<uuid> + + """ + permission_classes = (AllowAny,) + methods = ['GET'] + + def get(self, request, task_id): + return Response(self.event_stream(task_id), content_type='text/event-stream') + + def event_stream(self, task_id): + if not isinstance(task_id, uuid.UUID): + raise ValidationError('Invalid uuid format') + + while True: + file_obj = TaskFile.objects.get(task_id=task_id) + log_name = file_obj.log_name + returncode = file_obj.returncode + if log_name is None and returncode is None: + time.sleep(LOOK_DELAY) + continue + if log_name is None or log_name[0] != '/': + raise ValidationError('Invalid log_name format') + if not os.path.isfile(log_name): + logger.warning('log file does not exist') + yield "event: ERROR\ndata: no log file found\n\n" + return + if os.stat(log_name).st_size == 0 and returncode is None: + time.sleep(LOOK_DELAY) + continue + break + + # Open the log file + if os.stat(log_name).st_size == 0 and \ + returncode is not None: + logger.warning('log file is empty') + yield "event: ERROR\ndata: log file is empty\n\n" + return + + with open(log_name, 'r') as log_file: + # Start sending log + duplicatelineno = 0 + duplicatelines = 0 + lastline = '' + lineno = 0 + line = None + endtime = time.time() + SCILAB_INSTANCE_TIMEOUT_INTERVAL + log_size = 0 + figure_list = [] + + while time.time() <= endtime and log_size <= MAX_LOG_SIZE: + (line, state) = get_line_and_state(log_file, figure_list, + lineno, line) + # if incomplete line, wait for the complete line + if state == NOLINE: + gevent.sleep(LOOK_DELAY) + continue + + if not figure_list: + break + # Get the line and loop until the state is ENDING and figure_list + # empty. Determine if we get block id and give it to chart.js + if line is None: + continue + if lastline != line: + if duplicatelineno != 0: + duplicatelines += duplicatelineno + yield "event: duplicate\ndata: %d\n\n" % duplicatelineno + duplicatelineno = 0 + lastline = line + log_size += len(line) + if state == DATA: + yield "event: log\ndata: %s\n\n" % line + else: + duplicatelineno += 1 + lineno += 1 + line = None + + if duplicatelineno != 0: + duplicatelines += duplicatelineno + yield "event: duplicate\ndata: %d\n\n" % duplicatelineno + duplicatelineno = 0 + + if duplicatelines != 0: + logger.info('lines = %s, duplicate lines = %s, log size = %s', + lineno, duplicatelines, log_size) + else: + logger.info('lines = %s, log size = %s', lineno, log_size) + + # Notify Client + yield "event: DONE\ndata: None\n\n" |