from celery import shared_task, states from celery.exceptions import Ignore from celery.utils.log import get_task_logger from redis import Redis from threading import current_thread from blocks.celery_tasks import app from simulationAPI.helpers.ngspice_helper import CannotRunParser, ExecXml, update_task_status from simulationAPI.models import Task from simulationAPI.helpers.scilab_manager import uploadscript, getscriptoutput, kill_scilab logger = get_task_logger(__name__) def acquire_lock(session_id, timeout=1800): redis_client = Redis.from_url(app.conf.broker_url) lock = redis_client.lock(f"simulation_lock:{session_id}", timeout=timeout) lock.acquire(blocking=True) return lock def release_lock(lock): lock.release() @shared_task(bind=True) def process_task(self, task_id): task = Task.objects.get(task_id=task_id) session_id = task.session.session_id task_type = task.type current_thread().name = f"{session_id[:6]}:{task_id[:8]}" lock = acquire_lock(session_id) # Prevent multiple runs per session try: logger.info("Processing %s %s %s %s", task_id, task.file.path, task.session.app_name, task.workspace_file) state = 'STARTED' status = 'Started Processing File' update_task_status(self, task_id, state, meta={'status': status}) if task_type == 'SCRIPT': output = uploadscript(task.session, task) output = getscriptoutput(task.session, task) state = 'SUCCESS' update_task_status(self, task_id, state, meta=output) else: output = ExecXml(task, self.name, task.workspace_file) if output == "Streaming": state = 'STREAMING' status = 'Processed Xml, Streaming Output' elif output == "Success": state = 'SUCCESS' status = 'Processed Xml, Loading Output' else: logger.error('%s', output) raise CannotRunParser(output) update_task_status(self, task_id, state, meta={'status': status}) return output finally: release_lock(lock) # Ensure lock is always released @shared_task(bind=True) def kill_task(self, task_id): task = Task.objects.get(task_id=task_id) logger.info("Killing task %s", task) kill_scilab(None, task.session, task) return True