summaryrefslogtreecommitdiff
path: root/blocks/simulationAPI/tasks.py
blob: c76f5e9159dcf556cb0d199fa2ac15762e5b0123 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from celery import shared_task, states
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from redis import Redis
from threading import current_thread

from blocks.celery_tasks import app
from simulationAPI.helpers.ngspice_helper import CannotRunParser, ExecXml, update_task_status
from simulationAPI.models import Task
from simulationAPI.helpers.scilab_manager import uploadscript, getscriptoutput, kill_scilab

logger = get_task_logger(__name__)


def acquire_lock(session_id, timeout=1800):
    redis_client = Redis.from_url(app.conf.broker_url)
    lock = redis_client.lock(f"simulation_lock:{session_id}", timeout=timeout)
    lock.acquire(blocking=True)
    return lock


def release_lock(lock):
    lock.release()


@shared_task(bind=True)
def process_task(self, task_id):
    task = Task.objects.get(task_id=task_id)
    session_id = task.session.session_id
    task_type = task.type
    current_thread().name = f"{session_id[:6]}:{task_id[:8]}"
    lock = acquire_lock(session_id)  # Prevent multiple runs per session

    try:
        logger.info("Processing %s %s %s %s",
                    task_id, task.file.path, task.session.app_name, task.workspace_file)

        state = 'STARTED'
        status = 'Started Processing File'
        update_task_status(self, task_id, state, meta={'status': status})

        if task_type == 'SCRIPT':
            output = uploadscript(task.session, task)
            output = getscriptoutput(task.session, task)
            state = 'SUCCESS'
            update_task_status(self, task_id, state, meta=output)
        else:
            output = ExecXml(task, self.name, task.workspace_file)
            if output == "Streaming":
                state = 'STREAMING'
                status = 'Processed Xml, Streaming Output'
            elif output == "Success":
                state = 'SUCCESS'
                status = 'Processed Xml, Loading Output'
            else:
                logger.error('%s', output)
                raise CannotRunParser(output)

            update_task_status(self, task_id, state, meta={'status': status})

        return output

    finally:
        release_lock(lock)  # Ensure lock is always released


@shared_task(bind=True)
def kill_task(self, task_id):
    task = Task.objects.get(task_id=task_id)
    logger.info("Killing task %s", task)
    kill_scilab(None, task.session, task)
    return True