1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
from celery import shared_task, states
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from redis import Redis
from threading import current_thread
from blocks.celery_tasks import app
from simulationAPI.helpers.ngspice_helper import CannotRunParser, ExecXml, update_task_status
from simulationAPI.models import Task
from simulationAPI.helpers.scilab_manager import uploadscript, getscriptoutput, kill_scilab
logger = get_task_logger(__name__)
def acquire_lock(session_id, timeout=1800):
redis_client = Redis.from_url(app.conf.broker_url)
lock = redis_client.lock(f"simulation_lock:{session_id}", timeout=timeout)
lock.acquire(blocking=True)
return lock
def release_lock(lock):
lock.release()
@shared_task(bind=True)
def process_task(self, task_id):
task = Task.objects.get(task_id=task_id)
session_id = task.session.session_id
task_type = task.type
current_thread().name = f"{session_id[:6]}:{task_id[:8]}"
lock = acquire_lock(session_id) # Prevent multiple runs per session
try:
logger.info("Processing %s %s %s %s",
task_id, task.file.path, task.session.app_name, task.workspace_file)
state = 'STARTED'
status = 'Started Processing File'
update_task_status(self, task_id, state, meta={'status': status})
if task_type == 'SCRIPT':
output = uploadscript(task.session, task)
output = getscriptoutput(task.session, task)
state = 'SUCCESS'
update_task_status(self, task_id, state, meta=output)
else:
output = ExecXml(task, self.name, task.workspace_file)
if output == "Streaming":
state = 'STREAMING'
status = 'Processed Xml, Streaming Output'
elif output == "Success":
state = 'SUCCESS'
status = 'Processed Xml, Loading Output'
else:
logger.error('%s', output)
raise CannotRunParser(output)
update_task_status(self, task_id, state, meta={'status': status})
return output
finally:
release_lock(lock) # Ensure lock is always released
@shared_task(bind=True)
def kill_task(self, task_id):
task = Task.objects.get(task_id=task_id)
logger.info("Killing task %s", task)
kill_scilab(None, task.session, task)
return True
|