diff options
author | Suchita Lad | 2025-02-14 17:55:41 +0530 |
---|---|---|
committer | Suchita Lad | 2025-02-28 11:19:50 +0530 |
commit | 3467af723121362faaf053034446d0f6be0874d9 (patch) | |
tree | e34a6decae6e223eb5b425db51837872d0ab2a5d | |
parent | 9faefb54bcca167f807b9b58edc01533dc9fff8e (diff) | |
download | Common-Interface-Project-3467af723121362faaf053034446d0f6be0874d9.tar.gz Common-Interface-Project-3467af723121362faaf053034446d0f6be0874d9.tar.bz2 Common-Interface-Project-3467af723121362faaf053034446d0f6be0874d9.zip |
Updated celery code
-rw-r--r-- | blocks/blocks/celery_tasks.py | 49 | ||||
-rw-r--r-- | blocks/requirements.txt | 4 | ||||
-rw-r--r-- | blocks/simulationAPI/helpers/__init__.py | 0 | ||||
-rw-r--r-- | blocks/simulationAPI/helpers/scilab_manager.py | 297 |
4 files changed, 349 insertions, 1 deletions
diff --git a/blocks/blocks/celery_tasks.py b/blocks/blocks/celery_tasks.py index 79c88232..65c5aac2 100644 --- a/blocks/blocks/celery_tasks.py +++ b/blocks/blocks/celery_tasks.py @@ -1,15 +1,62 @@ +from simulationAPI.helpers.scilab_manager import ( + prestart_scilab_instances, + reap_scilab_instances, + clean_sessions_thread, + stop_scilab_instances, + clean_sessions +) import os +import redis +import django + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blocks.settings') +django.setup() +import multiprocessing + from celery import Celery +from celery.signals import worker_shutdown +import gevent + from django.conf import settings -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blocks.settings') app = Celery('blocks') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.conf.broker_connection_retry_on_startup = True +SCILAB_DIR = os.path.abspath(settings.SCILAB_DIR) +SCILAB = os.path.join(SCILAB_DIR, 'bin', 'scilab-adv-cli') + @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) + + +# Initialize Redis +redis_client = redis.Redis(host="localhost", port=6379, db=0) + +# Global keys for startup and shutdown +STARTUP_KEY = "celery_startup_done" +SHUTDOWN_KEY = "celery_shutdown_done" + + +# ✅ Ensure startup runs **only once** using Redis lock +if multiprocessing.current_process().name == "MainProcess": + if not redis_client.get(STARTUP_KEY): # Check if startup was already executed + print("🚀 Running global startup code only once!") + redis_client.set(STARTUP_KEY, "1", ex=60) # Optional: expire after 60s + else: + print("🚀 Global startup already executed, skipping.") + + +@worker_shutdown.connect +def global_shutdown_code(**kwargs): + """Ensures shutdown logic runs only once globally using Redis.""" + if multiprocessing.current_process().name == "MainProcess": + if not redis_client.get(SHUTDOWN_KEY): # Check if shutdown already executed + print("🔴 Running global shutdown code only once!") + redis_client.set(SHUTDOWN_KEY, "1", ex=60) # Optional: expire after 60s + else: + print("🔴 Global shutdown already executed, skipping.") diff --git a/blocks/requirements.txt b/blocks/requirements.txt index 48907727..a7fddbf5 100644 --- a/blocks/requirements.txt +++ b/blocks/requirements.txt @@ -23,6 +23,8 @@ djoser==2.3.1 drf-yasg==1.21.7 exceptiongroup==1.2.2 flake8==7.1.1 +gevent==24.11.1 +greenlet==3.1.1 h11==0.14.0 idna==3.7 inflection==0.5.1 @@ -64,3 +66,5 @@ wcwidth==0.2.6 webdriver-manager==4.0.2 websocket-client==1.8.0 wsproto==1.2.0 +zope.event==5.0 +zope.interface==7.2 diff --git a/blocks/simulationAPI/helpers/__init__.py b/blocks/simulationAPI/helpers/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/blocks/simulationAPI/helpers/__init__.py diff --git a/blocks/simulationAPI/helpers/scilab_manager.py b/blocks/simulationAPI/helpers/scilab_manager.py new file mode 100644 index 00000000..535b57d4 --- /dev/null +++ b/blocks/simulationAPI/helpers/scilab_manager.py @@ -0,0 +1,297 @@ +from gevent.monkey import patch_all + +patch_all(aggressive=False, subprocess=True) + +import os +import re +import time +import signal +import logging +import subprocess +import gevent +from gevent.event import Event +from gevent.lock import RLock +from threading import current_thread + + +SCILAB_MIN_INSTANCES = int(os.environ.get('SCILAB_MIN_INSTANCES', '1')) +SCILAB_MAX_INSTANCES = int(os.environ.get('SCILAB_MAX_INSTANCES', '3')) +SCILAB_START_INSTANCES = int(os.environ.get('SCILAB_START_INSTANCES', '2')) +SCILAB_INSTANCE_RETRY_INTERVAL = int(os.environ.get('SCILAB_INSTANCE_RETRY_INTERVAL', '15')) + + +# Configure logger +logger = logging.getLogger(__name__) # Create a logger for this module +logger.setLevel(logging.INFO) # Set logging level + +# Create a console handler +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) + +# Define log message format +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +console_handler.setFormatter(formatter) + +# Add the handler to the logger +logger.addHandler(console_handler) + + +class ScilabInstance: + proc = None + log_name = None + base = None + starttime = None + endtime = None + + def __init__(self): + (self.proc, self.log_name) = prestart_scilab() + + def __str__(self): + return "{pid: %s, log_name: %s}" % (self.proc.pid, self.log_name) + + +INSTANCES_1 = [] +INSTANCES_2 = [] +evt = Event() + + +def prestart_scilab(): + try: + proc = subprocess.Popen(["scilab-adv-cli", "-noatomsautoload", "-nb"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log_name = "scilab_log.txt" + return proc, log_name + except Exception as e: + print("Error starting Scilab:", e) + return None, None + + +def no_free_scilab_instance(): + l1 = len(INSTANCES_1) + return l1 == 0 + + +def too_many_scilab_instances(): + l1 = len(INSTANCES_1) + l2 = len(INSTANCES_2) + return l1 >= SCILAB_MIN_INSTANCES or \ + l1 + l2 >= SCILAB_MAX_INSTANCES + + +def start_scilab_instances(): + l1 = len(INSTANCES_1) + l2 = len(INSTANCES_2) + lssi = min(SCILAB_START_INSTANCES, + SCILAB_MAX_INSTANCES - l2) - l1 + if lssi > 0: + logger.info('can start %s instances', lssi) + return lssi + + +def print_scilab_instances(): + l1 = len(INSTANCES_1) + l2 = len(INSTANCES_2) + msg = '' + if l1 > 0: + msg += ', free=' + str(l1) + if l2 > 0: + msg += ', in use=' + str(l2) + logger.info('instance count: %s', msg[2:]) + + +FIRST_INSTANCE = True + + +def clean_sessions_thread(): + # Ensure this function exists + print("Cleaning sessions...") + + +def clean_sessions(force=False): + print("Cleaning sessions...") + + +def prestart_scilab_instances(): + global FIRST_INSTANCE + + current_thread().name = 'PreStart' + + attempt = 1 + + while True: + while too_many_scilab_instances(): + evt.wait() + + for i in range(start_scilab_instances()): + instance = ScilabInstance() + proc = instance.proc + if proc is None: + gevent.thread.interrupt_main() + return + + if FIRST_INSTANCE: + gevent.sleep(1) + for i in range(2, 4): + if proc.poll() is not None: + break + gevent.sleep(i) + + if proc.poll() is not None: + (out, err) = proc.communicate() + out = re.sub(r'^[ !\\-]*\n', r'', out, flags=re.MULTILINE) + if out: + logger.info('=== Output from scilab console ===\n%s', + out) + if err: + logger.info('=== Error from scilab console ===\n%s', + err) + + # Check for errors in Scilab + if 'Cannot find scilab-bin' in out: + logger.critical('scilab has not been built. ' + 'Follow the installation instructions') + gevent.thread.interrupt_main() + return + + returncode = proc.returncode + msg = 'attempts' if attempt != 1 else 'attempt' + + if attempt >= 4: + logger.critical('aborting after %s %s: rc = %s', + attempt, msg, returncode) + gevent.thread.interrupt_main() + return + + logger.error('retrying after %s %s: rc = %s', + attempt, msg, returncode) + gevent.sleep(SCILAB_INSTANCE_RETRY_INTERVAL * attempt) + attempt += 1 + FIRST_INSTANCE = True + continue + + INSTANCES_1.append(instance) + attempt = 1 + FIRST_INSTANCE = False + + print_scilab_instances() + + if too_many_scilab_instances(): + evt.clear() + + +def get_scilab_instance(): + global FIRST_INSTANCE + + try: + while True: + instance = INSTANCES_1.pop(0) + proc = instance.proc + if proc.poll() is not None: + logger.warning('scilab instance exited: return code is %s', + proc.returncode) + FIRST_INSTANCE = True + if not too_many_scilab_instances(): + evt.set() + if no_free_scilab_instance(): + gevent.sleep(4) + continue + + INSTANCES_2.append(instance) + print_scilab_instances() + if not too_many_scilab_instances(): + evt.set() + + return instance + except IndexError: + logger.error('No free instance') + return None + + +def remove_scilab_instance(instance): + try: + INSTANCES_2.remove(instance) + print_scilab_instances() + if not too_many_scilab_instances(): + evt.set() + except ValueError: + logger.error('could not find instance %s', instance) + + +def stop_scilab_instance(base, createlogfile=False): + stop_instance(base.instance, createlogfile) + + base.instance = None + + +def kill_scilab_with(proc, sig): + """Send a signal to a Scilab process.""" + try: + if proc and proc.pid: + os.kill(proc.pid, sig) + return True + except ProcessLookupError: + print(f"Process {proc.pid} not found.") + except Exception as e: + print(f"Error killing Scilab process: {e}") + return False + + +def stop_instance(instance, createlogfile=False, removeinstance=True): + if instance is None: + logger.warning('no instance') + return + + if not kill_scilab_with(instance.proc, signal.SIGTERM): + kill_scilab_with(instance.proc, signal.SIGKILL) + + if removeinstance: + remove_scilab_instance(instance) + + if instance.log_name is None: + if createlogfile: + logger.warning('empty diagram') + else: + # remove(instance.log_name) + instance.log_name = None + + instance.base = None + + +def stop_scilab_instances(): + if len(INSTANCES_1) > 0: + logger.info('stopping %s idle instances', len(INSTANCES_1)) + while len(INSTANCES_1) > 0: + instance = INSTANCES_1.pop() + stop_instance(instance, removeinstance=False) + + if len(INSTANCES_2) > 0: + logger.info('stopping %s busy instances', len(INSTANCES_2)) + while len(INSTANCES_2) > 0: + instance = INSTANCES_2.pop() + stop_instance(instance, removeinstance=False) + + +def reap_scilab_instances(): + current_thread().name = 'Reaper' + while True: + gevent.sleep(100) + + remove_instances = [] + + for instance in INSTANCES_2: + if instance.endtime < time(): + remove_instances.append(instance) + + count = len(remove_instances) + if count == 0: + continue + + logger.info('removing %s stale instances', count) + for instance in remove_instances: + base = instance.base + if base is None: + logger.warning('cannot stop instance %s', instance) + stop_instance(instance) + else: + logger.warning('cannot stop instance %s', instance) + stop_instance(instance) |