summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSunil Shetye2021-12-02 00:19:09 +0530
committerSunil Shetye2021-12-02 00:19:26 +0530
commit8032b8624a15c405f98a35f3717cab34db889b21 (patch)
tree7b9b482a95cd4d8dd385fb95bb79598dc1bc2bba
parentf55183c54d08ffce07ce08606d28cbaef831ea10 (diff)
downloadCommon-Interface-Project-8032b8624a15c405f98a35f3717cab34db889b21.tar.gz
Common-Interface-Project-8032b8624a15c405f98a35f3717cab34db889b21.tar.bz2
Common-Interface-Project-8032b8624a15c405f98a35f3717cab34db889b21.zip
add STREAM as a celery state
-rw-r--r--blocks/simulationAPI/helpers/ngspice_helper.py2
-rw-r--r--blocks/simulationAPI/models.py1
-rw-r--r--blocks/simulationAPI/serializers.py2
-rw-r--r--blocks/simulationAPI/tasks.py5
-rw-r--r--blocks/simulationAPI/urls.py3
-rw-r--r--blocks/simulationAPI/views.py181
6 files changed, 190 insertions, 4 deletions
diff --git a/blocks/simulationAPI/helpers/ngspice_helper.py b/blocks/simulationAPI/helpers/ngspice_helper.py
index 6e9171e2..12e68cb7 100644
--- a/blocks/simulationAPI/helpers/ngspice_helper.py
+++ b/blocks/simulationAPI/helpers/ngspice_helper.py
@@ -102,7 +102,7 @@ def ExecXml(filepath, file_id, parameters):
logger.info('err=%s', err)
log_name = '/tmp/blocks-tmp/scilab-log.txt' # FIXME: remove this line
- return ('Success', log_name)
+ return ('Success', log_name, proc.returncode)
except BaseException as e:
logger.exception('Encountered Exception:')
logger.info('removing %s', filepath)
diff --git a/blocks/simulationAPI/models.py b/blocks/simulationAPI/models.py
index 35bf8157..2124fd47 100644
--- a/blocks/simulationAPI/models.py
+++ b/blocks/simulationAPI/models.py
@@ -26,6 +26,7 @@ class TaskFile(models.Model):
parameters = models.TextField(blank=True, null=True)
upload_time = models.DateTimeField(auto_now=True)
log_name = models.CharField(max_length=500, blank=True, null=True)
+ returncode = models.IntegerField(blank=True, null=True)
task = models.OneToOneField(Task, on_delete=models.CASCADE,
related_name='file')
diff --git a/blocks/simulationAPI/serializers.py b/blocks/simulationAPI/serializers.py
index b3d22120..ee7da0fe 100644
--- a/blocks/simulationAPI/serializers.py
+++ b/blocks/simulationAPI/serializers.py
@@ -10,7 +10,7 @@ class TaskFileSerializer(serializers.ModelSerializer):
class Meta:
model = TaskFile
fields = ('file_id', 'file', 'app_name', 'parameters', 'upload_time',
- 'log_name', 'task')
+ 'log_name', 'returncode', 'task')
class TaskSerializer(serializers.HyperlinkedModelSerializer):
diff --git a/blocks/simulationAPI/tasks.py b/blocks/simulationAPI/tasks.py
index bb95a0ea..f3c4891c 100644
--- a/blocks/simulationAPI/tasks.py
+++ b/blocks/simulationAPI/tasks.py
@@ -30,10 +30,11 @@ def process_task(task_id):
output = ngspice_helper.ExecXml(file_path, file_id, parameters)
if output[0] == "Success":
file_obj.log_name = output[1]
+ file_obj.returncode = output[2]
file_obj.save()
current_task.update_state(
- state='PROGRESS',
- meta={'current_process': 'Processed Xml, Loading Output'})
+ state='STREAM',
+ meta={'current_process': 'Processed Xml, Streaming Output'})
return output[0]
except Exception as e:
diff --git a/blocks/simulationAPI/urls.py b/blocks/simulationAPI/urls.py
index eab40590..f03380d1 100644
--- a/blocks/simulationAPI/urls.py
+++ b/blocks/simulationAPI/urls.py
@@ -8,4 +8,7 @@ urlpatterns = [
path('status/<uuid:task_id>',
simulationAPI_views.CeleryResultView.as_view(), name='celery_status'),
+
+ path('stream/<uuid:task_id>',
+ simulationAPI_views.StreamView.as_view(), name='stream_status'),
]
diff --git a/blocks/simulationAPI/views.py b/blocks/simulationAPI/views.py
index 626c6175..f1056bf3 100644
--- a/blocks/simulationAPI/views.py
+++ b/blocks/simulationAPI/views.py
@@ -1,3 +1,4 @@
+from simulationAPI.models import TaskFile
from simulationAPI.serializers import TaskSerializer
from simulationAPI.tasks import process_task
from rest_framework import status
@@ -7,10 +8,26 @@ from rest_framework.permissions import AllowAny
from rest_framework.views import APIView
from rest_framework.response import Response
from celery.result import AsyncResult
+import os
import uuid
import logging
+import time
+SCILAB_INSTANCE_TIMEOUT_INTERVAL = 300
+MAX_LOG_SIZE = 512 * 1024
+LOOK_DELAY = 0.1
+
+# States of the line
+# to indicate initialization of block in log file is encountered
+INITIALIZATION = 0
+# to indicate ending of log file data for that block is encountered
+ENDING = 1
+# to indicate data is proper and can be read
+DATA = 2
+# to indicate there is no line in log file further
+NOLINE = -1
+
logger = logging.getLogger(__name__)
@@ -62,3 +79,167 @@ class CeleryResultView(APIView):
'details': celery_result.info
}
return Response(response_data)
+
+
+def parse_line(line, lineno):
+ '''
+ Function to parse the line
+ Returns tuple of figure ID and state
+ state = INITIALIZATION if new figure is created
+ ENDING if current fig end
+ DATA otherwise
+ '''
+ line_words = line.split(' ') # Each line is split to read condition
+ try:
+ # The below condition determines the block ID
+ if line_words[0] == "Initialization":
+ # New figure created
+ # Get fig id
+ # to extract figure ids (sometime multiple sinks can be used in one
+ # diagram to differentiate that)
+ figure_id = line_words[-1]
+ return (figure_id, INITIALIZATION)
+ elif line_words[0] == "Ending":
+ # Current figure end
+ # Get fig id
+ figure_id = line_words[-1]
+ return (figure_id, ENDING)
+ else:
+ # Current figure coordinates
+ figure_id = line_words[2]
+ return (figure_id, DATA)
+ except Exception as e:
+ logger.error('%s while parsing %s on line %s', str(e), line, lineno)
+ return (None, NOLINE)
+
+
+
+def get_line_and_state(file, figure_list, lineno, incomplete_line):
+ '''
+ Function to get a new line from file
+ This also parses the line and appends new figures to figure List
+ '''
+ line = file.readline() # read line by line from log
+ if not line: # if line is empty then return noline
+ return (incomplete_line, NOLINE)
+ if incomplete_line is not None:
+ line = incomplete_line + line
+ if '\n' not in line:
+ return (line, NOLINE)
+ # every line is passed to function parse_line for getting values
+ line = line.rstrip()
+ parse_result = parse_line(line, lineno)
+ figure_id = parse_result[0]
+ state = parse_result[1]
+ if state == INITIALIZATION:
+ # New figure created
+ # Add figure ID to list
+ figure_list.append(figure_id) # figure id of block is added to list
+ return (None, INITIALIZATION)
+ elif state == ENDING:
+ # End of figure
+ # Remove figure ID from list
+ # Once ending of log file/data is encountered for that block, figure id
+ # will be removed
+ figure_list.remove(figure_id)
+ return (None, ENDING)
+ elif state == NOLINE:
+ return (None, NOLINE)
+ return (line, DATA)
+
+
+class StreamView(APIView):
+ """
+
+ Streams Simulation results for 'task_id' provided after
+ uploading the xml
+ /api/stream/<uuid>
+
+ """
+ permission_classes = (AllowAny,)
+ methods = ['GET']
+
+ def get(self, request, task_id):
+ return Response(self.event_stream(task_id), content_type='text/event-stream')
+
+ def event_stream(self, task_id):
+ if not isinstance(task_id, uuid.UUID):
+ raise ValidationError('Invalid uuid format')
+
+ while True:
+ file_obj = TaskFile.objects.get(task_id=task_id)
+ log_name = file_obj.log_name
+ returncode = file_obj.returncode
+ if log_name is None and returncode is None:
+ time.sleep(LOOK_DELAY)
+ continue
+ if log_name is None or log_name[0] != '/':
+ raise ValidationError('Invalid log_name format')
+ if not os.path.isfile(log_name):
+ logger.warning('log file does not exist')
+ yield "event: ERROR\ndata: no log file found\n\n"
+ return
+ if os.stat(log_name).st_size == 0 and returncode is None:
+ time.sleep(LOOK_DELAY)
+ continue
+ break
+
+ # Open the log file
+ if os.stat(log_name).st_size == 0 and \
+ returncode is not None:
+ logger.warning('log file is empty')
+ yield "event: ERROR\ndata: log file is empty\n\n"
+ return
+
+ with open(log_name, 'r') as log_file:
+ # Start sending log
+ duplicatelineno = 0
+ duplicatelines = 0
+ lastline = ''
+ lineno = 0
+ line = None
+ endtime = time.time() + SCILAB_INSTANCE_TIMEOUT_INTERVAL
+ log_size = 0
+ figure_list = []
+
+ while time.time() <= endtime and log_size <= MAX_LOG_SIZE:
+ (line, state) = get_line_and_state(log_file, figure_list,
+ lineno, line)
+ # if incomplete line, wait for the complete line
+ if state == NOLINE:
+ gevent.sleep(LOOK_DELAY)
+ continue
+
+ if not figure_list:
+ break
+ # Get the line and loop until the state is ENDING and figure_list
+ # empty. Determine if we get block id and give it to chart.js
+ if line is None:
+ continue
+ if lastline != line:
+ if duplicatelineno != 0:
+ duplicatelines += duplicatelineno
+ yield "event: duplicate\ndata: %d\n\n" % duplicatelineno
+ duplicatelineno = 0
+ lastline = line
+ log_size += len(line)
+ if state == DATA:
+ yield "event: log\ndata: %s\n\n" % line
+ else:
+ duplicatelineno += 1
+ lineno += 1
+ line = None
+
+ if duplicatelineno != 0:
+ duplicatelines += duplicatelineno
+ yield "event: duplicate\ndata: %d\n\n" % duplicatelineno
+ duplicatelineno = 0
+
+ if duplicatelines != 0:
+ logger.info('lines = %s, duplicate lines = %s, log size = %s',
+ lineno, duplicatelines, log_size)
+ else:
+ logger.info('lines = %s, log size = %s', lineno, log_size)
+
+ # Notify Client
+ yield "event: DONE\ndata: None\n\n"