summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSuchita Lad2025-02-14 17:55:41 +0530
committerSuchita Lad2025-02-28 11:19:50 +0530
commit3467af723121362faaf053034446d0f6be0874d9 (patch)
treee34a6decae6e223eb5b425db51837872d0ab2a5d
parent9faefb54bcca167f807b9b58edc01533dc9fff8e (diff)
downloadCommon-Interface-Project-3467af723121362faaf053034446d0f6be0874d9.tar.gz
Common-Interface-Project-3467af723121362faaf053034446d0f6be0874d9.tar.bz2
Common-Interface-Project-3467af723121362faaf053034446d0f6be0874d9.zip
Updated celery code
-rw-r--r--blocks/blocks/celery_tasks.py49
-rw-r--r--blocks/requirements.txt4
-rw-r--r--blocks/simulationAPI/helpers/__init__.py0
-rw-r--r--blocks/simulationAPI/helpers/scilab_manager.py297
4 files changed, 349 insertions, 1 deletions
diff --git a/blocks/blocks/celery_tasks.py b/blocks/blocks/celery_tasks.py
index 79c88232..65c5aac2 100644
--- a/blocks/blocks/celery_tasks.py
+++ b/blocks/blocks/celery_tasks.py
@@ -1,15 +1,62 @@
+from simulationAPI.helpers.scilab_manager import (
+ prestart_scilab_instances,
+ reap_scilab_instances,
+ clean_sessions_thread,
+ stop_scilab_instances,
+ clean_sessions
+)
import os
+import redis
+import django
+
+os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blocks.settings')
+django.setup()
+import multiprocessing
+
from celery import Celery
+from celery.signals import worker_shutdown
+import gevent
+
from django.conf import settings
-os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blocks.settings')
app = Celery('blocks')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.broker_connection_retry_on_startup = True
+SCILAB_DIR = os.path.abspath(settings.SCILAB_DIR)
+SCILAB = os.path.join(SCILAB_DIR, 'bin', 'scilab-adv-cli')
+
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
+
+
+# Initialize Redis
+redis_client = redis.Redis(host="localhost", port=6379, db=0)
+
+# Global keys for startup and shutdown
+STARTUP_KEY = "celery_startup_done"
+SHUTDOWN_KEY = "celery_shutdown_done"
+
+
+# ✅ Ensure startup runs **only once** using Redis lock
+if multiprocessing.current_process().name == "MainProcess":
+ if not redis_client.get(STARTUP_KEY): # Check if startup was already executed
+ print("🚀 Running global startup code only once!")
+ redis_client.set(STARTUP_KEY, "1", ex=60) # Optional: expire after 60s
+ else:
+ print("🚀 Global startup already executed, skipping.")
+
+
+@worker_shutdown.connect
+def global_shutdown_code(**kwargs):
+ """Ensures shutdown logic runs only once globally using Redis."""
+ if multiprocessing.current_process().name == "MainProcess":
+ if not redis_client.get(SHUTDOWN_KEY): # Check if shutdown already executed
+ print("🔴 Running global shutdown code only once!")
+ redis_client.set(SHUTDOWN_KEY, "1", ex=60) # Optional: expire after 60s
+ else:
+ print("🔴 Global shutdown already executed, skipping.")
diff --git a/blocks/requirements.txt b/blocks/requirements.txt
index 48907727..a7fddbf5 100644
--- a/blocks/requirements.txt
+++ b/blocks/requirements.txt
@@ -23,6 +23,8 @@ djoser==2.3.1
drf-yasg==1.21.7
exceptiongroup==1.2.2
flake8==7.1.1
+gevent==24.11.1
+greenlet==3.1.1
h11==0.14.0
idna==3.7
inflection==0.5.1
@@ -64,3 +66,5 @@ wcwidth==0.2.6
webdriver-manager==4.0.2
websocket-client==1.8.0
wsproto==1.2.0
+zope.event==5.0
+zope.interface==7.2
diff --git a/blocks/simulationAPI/helpers/__init__.py b/blocks/simulationAPI/helpers/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/blocks/simulationAPI/helpers/__init__.py
diff --git a/blocks/simulationAPI/helpers/scilab_manager.py b/blocks/simulationAPI/helpers/scilab_manager.py
new file mode 100644
index 00000000..535b57d4
--- /dev/null
+++ b/blocks/simulationAPI/helpers/scilab_manager.py
@@ -0,0 +1,297 @@
+from gevent.monkey import patch_all
+
+patch_all(aggressive=False, subprocess=True)
+
+import os
+import re
+import time
+import signal
+import logging
+import subprocess
+import gevent
+from gevent.event import Event
+from gevent.lock import RLock
+from threading import current_thread
+
+
+SCILAB_MIN_INSTANCES = int(os.environ.get('SCILAB_MIN_INSTANCES', '1'))
+SCILAB_MAX_INSTANCES = int(os.environ.get('SCILAB_MAX_INSTANCES', '3'))
+SCILAB_START_INSTANCES = int(os.environ.get('SCILAB_START_INSTANCES', '2'))
+SCILAB_INSTANCE_RETRY_INTERVAL = int(os.environ.get('SCILAB_INSTANCE_RETRY_INTERVAL', '15'))
+
+
+# Configure logger
+logger = logging.getLogger(__name__) # Create a logger for this module
+logger.setLevel(logging.INFO) # Set logging level
+
+# Create a console handler
+console_handler = logging.StreamHandler()
+console_handler.setLevel(logging.INFO)
+
+# Define log message format
+formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
+console_handler.setFormatter(formatter)
+
+# Add the handler to the logger
+logger.addHandler(console_handler)
+
+
+class ScilabInstance:
+ proc = None
+ log_name = None
+ base = None
+ starttime = None
+ endtime = None
+
+ def __init__(self):
+ (self.proc, self.log_name) = prestart_scilab()
+
+ def __str__(self):
+ return "{pid: %s, log_name: %s}" % (self.proc.pid, self.log_name)
+
+
+INSTANCES_1 = []
+INSTANCES_2 = []
+evt = Event()
+
+
+def prestart_scilab():
+ try:
+ proc = subprocess.Popen(["scilab-adv-cli", "-noatomsautoload", "-nb"],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ log_name = "scilab_log.txt"
+ return proc, log_name
+ except Exception as e:
+ print("Error starting Scilab:", e)
+ return None, None
+
+
+def no_free_scilab_instance():
+ l1 = len(INSTANCES_1)
+ return l1 == 0
+
+
+def too_many_scilab_instances():
+ l1 = len(INSTANCES_1)
+ l2 = len(INSTANCES_2)
+ return l1 >= SCILAB_MIN_INSTANCES or \
+ l1 + l2 >= SCILAB_MAX_INSTANCES
+
+
+def start_scilab_instances():
+ l1 = len(INSTANCES_1)
+ l2 = len(INSTANCES_2)
+ lssi = min(SCILAB_START_INSTANCES,
+ SCILAB_MAX_INSTANCES - l2) - l1
+ if lssi > 0:
+ logger.info('can start %s instances', lssi)
+ return lssi
+
+
+def print_scilab_instances():
+ l1 = len(INSTANCES_1)
+ l2 = len(INSTANCES_2)
+ msg = ''
+ if l1 > 0:
+ msg += ', free=' + str(l1)
+ if l2 > 0:
+ msg += ', in use=' + str(l2)
+ logger.info('instance count: %s', msg[2:])
+
+
+FIRST_INSTANCE = True
+
+
+def clean_sessions_thread():
+ # Ensure this function exists
+ print("Cleaning sessions...")
+
+
+def clean_sessions(force=False):
+ print("Cleaning sessions...")
+
+
+def prestart_scilab_instances():
+ global FIRST_INSTANCE
+
+ current_thread().name = 'PreStart'
+
+ attempt = 1
+
+ while True:
+ while too_many_scilab_instances():
+ evt.wait()
+
+ for i in range(start_scilab_instances()):
+ instance = ScilabInstance()
+ proc = instance.proc
+ if proc is None:
+ gevent.thread.interrupt_main()
+ return
+
+ if FIRST_INSTANCE:
+ gevent.sleep(1)
+ for i in range(2, 4):
+ if proc.poll() is not None:
+ break
+ gevent.sleep(i)
+
+ if proc.poll() is not None:
+ (out, err) = proc.communicate()
+ out = re.sub(r'^[ !\\-]*\n', r'', out, flags=re.MULTILINE)
+ if out:
+ logger.info('=== Output from scilab console ===\n%s',
+ out)
+ if err:
+ logger.info('=== Error from scilab console ===\n%s',
+ err)
+
+ # Check for errors in Scilab
+ if 'Cannot find scilab-bin' in out:
+ logger.critical('scilab has not been built. '
+ 'Follow the installation instructions')
+ gevent.thread.interrupt_main()
+ return
+
+ returncode = proc.returncode
+ msg = 'attempts' if attempt != 1 else 'attempt'
+
+ if attempt >= 4:
+ logger.critical('aborting after %s %s: rc = %s',
+ attempt, msg, returncode)
+ gevent.thread.interrupt_main()
+ return
+
+ logger.error('retrying after %s %s: rc = %s',
+ attempt, msg, returncode)
+ gevent.sleep(SCILAB_INSTANCE_RETRY_INTERVAL * attempt)
+ attempt += 1
+ FIRST_INSTANCE = True
+ continue
+
+ INSTANCES_1.append(instance)
+ attempt = 1
+ FIRST_INSTANCE = False
+
+ print_scilab_instances()
+
+ if too_many_scilab_instances():
+ evt.clear()
+
+
+def get_scilab_instance():
+ global FIRST_INSTANCE
+
+ try:
+ while True:
+ instance = INSTANCES_1.pop(0)
+ proc = instance.proc
+ if proc.poll() is not None:
+ logger.warning('scilab instance exited: return code is %s',
+ proc.returncode)
+ FIRST_INSTANCE = True
+ if not too_many_scilab_instances():
+ evt.set()
+ if no_free_scilab_instance():
+ gevent.sleep(4)
+ continue
+
+ INSTANCES_2.append(instance)
+ print_scilab_instances()
+ if not too_many_scilab_instances():
+ evt.set()
+
+ return instance
+ except IndexError:
+ logger.error('No free instance')
+ return None
+
+
+def remove_scilab_instance(instance):
+ try:
+ INSTANCES_2.remove(instance)
+ print_scilab_instances()
+ if not too_many_scilab_instances():
+ evt.set()
+ except ValueError:
+ logger.error('could not find instance %s', instance)
+
+
+def stop_scilab_instance(base, createlogfile=False):
+ stop_instance(base.instance, createlogfile)
+
+ base.instance = None
+
+
+def kill_scilab_with(proc, sig):
+ """Send a signal to a Scilab process."""
+ try:
+ if proc and proc.pid:
+ os.kill(proc.pid, sig)
+ return True
+ except ProcessLookupError:
+ print(f"Process {proc.pid} not found.")
+ except Exception as e:
+ print(f"Error killing Scilab process: {e}")
+ return False
+
+
+def stop_instance(instance, createlogfile=False, removeinstance=True):
+ if instance is None:
+ logger.warning('no instance')
+ return
+
+ if not kill_scilab_with(instance.proc, signal.SIGTERM):
+ kill_scilab_with(instance.proc, signal.SIGKILL)
+
+ if removeinstance:
+ remove_scilab_instance(instance)
+
+ if instance.log_name is None:
+ if createlogfile:
+ logger.warning('empty diagram')
+ else:
+ # remove(instance.log_name)
+ instance.log_name = None
+
+ instance.base = None
+
+
+def stop_scilab_instances():
+ if len(INSTANCES_1) > 0:
+ logger.info('stopping %s idle instances', len(INSTANCES_1))
+ while len(INSTANCES_1) > 0:
+ instance = INSTANCES_1.pop()
+ stop_instance(instance, removeinstance=False)
+
+ if len(INSTANCES_2) > 0:
+ logger.info('stopping %s busy instances', len(INSTANCES_2))
+ while len(INSTANCES_2) > 0:
+ instance = INSTANCES_2.pop()
+ stop_instance(instance, removeinstance=False)
+
+
+def reap_scilab_instances():
+ current_thread().name = 'Reaper'
+ while True:
+ gevent.sleep(100)
+
+ remove_instances = []
+
+ for instance in INSTANCES_2:
+ if instance.endtime < time():
+ remove_instances.append(instance)
+
+ count = len(remove_instances)
+ if count == 0:
+ continue
+
+ logger.info('removing %s stale instances', count)
+ for instance in remove_instances:
+ base = instance.base
+ if base is None:
+ logger.warning('cannot stop instance %s', instance)
+ stop_instance(instance)
+ else:
+ logger.warning('cannot stop instance %s', instance)
+ stop_instance(instance)