AkamaSoft
1 / 2
Mots 1-2500 / 4319

📝 Introduction : Ce guide détaillé vous explique comment créer, configurer et exécuter plusieurs instances de Celery Beat (Instance A et Instance B) dans un même projet Django. Nous verrons comment les synchroniser via Redis/RabbitMQ et éviter les conflits d'exécution.

📝 Introduction : Ce guide détaillé vous explique comment créer, configurer et exécuter plusieurs instances de Celery Beat (Instance A et Instance B) dans un même projet Django. Nous verrons comment les synchroniser via Redis/RabbitMQ et éviter les conflits d'exécution.

📋 Table des Matières

  1. Installation et Configuration
  2. Structure du Projet Django
  3. Modèles Django pour la Gestion
  4. Configuration Instance A (Haute Priorité)
  5. Configuration Instance B (Basse Priorité)
  6. Configuration Redis Détaillée
  7. Configuration RabbitMQ Détaillée
  8. Synchronisation et Verrouillage
  9. Exécution et Déploiement
  10. Monitoring et Gestion
  • Installation et Configuration
  • Structure du Projet Django
  • Modèles Django pour la Gestion
  • Configuration Instance A (Haute Priorité)
  • Configuration Instance B (Basse Priorité)
  • Configuration Redis Détaillée
  • Configuration RabbitMQ Détaillée
  • Synchronisation et Verrouillage
  • Exécution et Déploiement
  • Monitoring et Gestion
  • 📦 Installation et Configuration

    Dépendances Requises

    # requirements.txt
    Django==4.2.7
    celery==5.3.4
    django-celery-beat==2.5.0
    redis==5.0.1
    kombu==5.3.4
    pika==1.3.2
    python-decouple==3.8
    psycopg2-binary==2.9.9
    supervisor==4.2.5

    # requirements.txt
    Django==4.2.7
    celery==5.3.4
    django-celery-beat==2.5.0
    redis==5.0.1
    kombu==5.3.4
    pika==1.3.2
    python-decouple==3.8
    psycopg2-binary==2.9.9
    supervisor==4.2.5

    Installation des Paquets

    # Installation des dépendances
    pip install -r requirements.txt

    # Installation Redis
    sudo apt update
    sudo apt install redis-server
    sudo systemctl start redis-server
    sudo systemctl enable redis-server

    # Installation RabbitMQ
    sudo apt install rabbitmq-server
    sudo systemctl start rabbitmq-server
    sudo systemctl enable rabbitmq-server
    sudo rabbitmq-plugins enable rabbitmq_management

    # Installation des dépendances
    pip install -r requirements.txt

    # Installation Redis
    sudo apt update
    sudo apt install redis-server
    sudo systemctl start redis-server
    sudo systemctl enable redis-server

    # Installation RabbitMQ
    sudo apt install rabbitmq-server
    sudo systemctl start rabbitmq-server
    sudo systemctl enable rabbitmq-server
    sudo rabbitmq-plugins enable rabbitmq_management

    🏗️ Structure du Projet Django

    my_django_project/
    ├── manage.py
    ├── requirements.txt
    ├── .env
    ├── my_project/
    │ ├── __init__.py
    │ ├── settings/
    │ │ ├── __init__.py
    │ │ ├── base.py
    │ │ ├── redis.py
    │ │ ├── rabbitmq.py
    │ │ └── production.py
    │ ├── urls.py
    │ └── wsgi.py
    ├── apps/
    │ ├── __init__.py
    │ ├── celery_app/
    │ │ ├── __init__.py
    │ │ ├── celery.py
    │ │ ├── models.py
    │ │ ├── tasks.py
    │ │ ├── admin.py
    │ │ ├── redis_sync.py
    │ │ ├── rabbitmq_sync.py
    │ │ └── management/
    │ │ └── commands/
    │ │ ├── __init__.py
    │ │ ├── start_beat_instance.py
    │ │ ├── manage_instances.py
    │ │ └── test_sync.py
    │ └── notifications/
    │ ├── models.py
    │ └── tasks.py
    ├── static/
    ├── media/
    ├── logs/
    ├── scripts/
    │ ├── start_instance_a.sh
    │ ├── start_instance_b.sh
    │ └── monitor_instances.sh
    └── docker-compose.yml

    my_django_project/
    ├── manage.py
    ├── requirements.txt
    ├── .env
    ├── my_project/
    │ ├── __init__.py
    │ ├── settings/
    │ │ ├── __init__.py
    │ │ ├── base.py
    │ │ ├── redis.py
    │ │ ├── rabbitmq.py
    │ │ └── production.py
    │ ├── urls.py
    │ └── wsgi.py
    ├── apps/
    │ ├── __init__.py
    │ ├── celery_app/
    │ │ ├── __init__.py
    │ │ ├── celery.py
    │ │ ├── models.py
    │ │ ├── tasks.py
    │ │ ├── admin.py
    │ │ ├── redis_sync.py
    │ │ ├── rabbitmq_sync.py
    │ │ └── management/
    │ │ └── commands/
    │ │ ├── __init__.py
    │ │ ├── start_beat_instance.py
    │ │ ├── manage_instances.py
    │ │ └── test_sync.py
    │ └── notifications/
    │ ├── models.py
    │ └── tasks.py
    ├── static/
    ├── media/
    ├── logs/
    ├── scripts/
    │ ├── start_instance_a.sh
    │ ├── start_instance_b.sh
    │ └── monitor_instances.sh
    └── docker-compose.yml

    🗄️ Modèles Django pour la Gestion

    Modèle Instance Beat

    # apps/celery_app/models.py
    from django.db import models
    from django.utils import timezone
    import uuid
    import json

    class BeatInstance(models.Model):
    """Modèle pour gérer les instances Beat"""
    INSTANCE_TYPES = [
    ('instance_a', 'Instance A - Haute Priorité'),
    ('instance_b', 'Instance B - Basse Priorité'),
    ]

    STATUS_CHOICES = [
    ('active', 'Active'),
    ('inactive', 'Inactive'),
    ('error', 'Erreur'),
    ]

    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    name = models.CharField(max_length=100, unique=True)
    instance_type = models.CharField(max_length=20, choices=INSTANCE_TYPES)
    queue_name = models.CharField(max_length=50)
    status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='active')
    broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')], default='redis')
    last_heartbeat = models.DateTimeField(default=timezone.now)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    config_data = models.JSONField(default=dict, blank=True)

    class Meta:
    verbose_name = "Instance Beat"
    verbose_name_plural = "Instances Beat"
    ordering = ['-last_heartbeat']

    def __str__(self):
    return f"{self.name} ({self.instance_type}) - {self.broker_type}"

    def is_online(self):
    """Vérifie si l'instance est en ligne"""
    time_diff = timezone.now() - self.last_heartbeat
    return time_diff.total_seconds() < 300 # 5 minutes

    def get_config(self):
    """Retourne la configuration de l'instance"""
    return self.config_data

    def set_config(self, config_dict):
    """Définit la configuration de l'instance"""
    self.config_data = config_dict
    self.save(update_fields=['config_data'])

    class BeatTaskLock(models.Model):
    """Modèle pour gérer les verrous des tâches"""
    task_name = models.CharField(max_length=255)
    instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
    lock_key = models.CharField(max_length=100, unique=True)
    broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])
    acquired_at = models.DateTimeField(auto_now_add=True)
    expires_at = models.DateTimeField()
    is_active = models.BooleanField(default=True)

    class Meta:
    verbose_name = "Verrou de Tâche Beat"
    verbose_name_plural = "Verrous de Tâches Beat"
    indexes = [
    models.Index(fields=['lock_key', 'expires_at']),
    models.Index(fields=['instance', 'is_active']),
    models.Index(fields=['broker_type']),
    ]

    def __str__(self):
    return f"{self.task_name} - {self.instance.name} ({self.broker_type})"

    def is_expired(self):
    """Vérifie si le verrou est expiré"""
    return timezone.now() > self.expires_at

    class BeatTaskExecution(models.Model):
    """Modèle pour tracer l'exécution des tâches"""
    task_name = models.CharField(max_length=255)
    instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
    started_at = models.DateTimeField(auto_now_add=True)
    completed_at = models.DateTimeField(null=True, blank=True)
    status = models.CharField(
    max_length=20,
    choices=[
    ('running', 'En cours'),
    ('completed', 'Terminé'),
    ('failed', 'Échoué'),
    ('skipped', 'Ignoré'),
    ],
    default='running'
    )
    result = models.TextField(blank=True)
    error_message = models.TextField(blank=True)
    execution_time = models.FloatField(null=True, blank=True)
    broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])

    class Meta:
    verbose_name = "Exécution de Tâche Beat"
    verbose_name_plural = "Exécutions de Tâches Beat"
    ordering = ['-started_at']

    def __str__(self):
    return f"{self.task_name} - {self.instance.name} ({self.broker_type})"

    def mark_completed(self, result=None):
    """Marque la tâche comme terminée"""
    self.status = 'completed'
    self.completed_at = timezone.now()
    if result:
    self.result = str(result)

    if self.started_at:
    execution_time = (self.completed_at - self.started_at).total_seconds()
    self.execution_time = execution_time

    self.save()

    def mark_failed(self, error_message=None):
    """Marque la tâche comme échouée"""
    self.status = 'failed'
    self.completed_at = timezone.now()
    if error_message:
    self.error_message = error_message

    self.save()

    # apps/celery_app/models.py
    from django.db import models
    from django.utils import timezone
    import uuid
    import json

    class BeatInstance(models.Model):
    """Modèle pour gérer les instances Beat"""
    INSTANCE_TYPES = [
    ('instance_a', 'Instance A - Haute Priorité'),
    ('instance_b', 'Instance B - Basse Priorité'),
    ]

    STATUS_CHOICES = [
    ('active', 'Active'),
    ('inactive', 'Inactive'),
    ('error', 'Erreur'),
    ]

    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    name = models.CharField(max_length=100, unique=True)
    instance_type = models.CharField(max_length=20, choices=INSTANCE_TYPES)
    queue_name = models.CharField(max_length=50)
    status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='active')
    broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')], default='redis')
    last_heartbeat = models.DateTimeField(default=timezone.now)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    config_data = models.JSONField(default=dict, blank=True)

    class Meta:
    verbose_name = "Instance Beat"
    verbose_name_plural = "Instances Beat"
    ordering = ['-last_heartbeat']

    def __str__(self):
    return f"{self.name} ({self.instance_type}) - {self.broker_type}"

    def is_online(self):
    """Vérifie si l'instance est en ligne"""
    time_diff = timezone.now() - self.last_heartbeat
    return time_diff.total_seconds() < 300 # 5 minutes

    def get_config(self):
    """Retourne la configuration de l'instance"""
    return self.config_data

    def set_config(self, config_dict):
    """Définit la configuration de l'instance"""
    self.config_data = config_dict
    self.save(update_fields=['config_data'])

    class BeatTaskLock(models.Model):
    """Modèle pour gérer les verrous des tâches"""
    task_name = models.CharField(max_length=255)
    instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
    lock_key = models.CharField(max_length=100, unique=True)
    broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])
    acquired_at = models.DateTimeField(auto_now_add=True)
    expires_at = models.DateTimeField()
    is_active = models.BooleanField(default=True)

    class Meta:
    verbose_name = "Verrou de Tâche Beat"
    verbose_name_plural = "Verrous de Tâches Beat"
    indexes = [
    models.Index(fields=['lock_key', 'expires_at']),
    models.Index(fields=['instance', 'is_active']),
    models.Index(fields=['broker_type']),
    ]

    def __str__(self):
    return f"{self.task_name} - {self.instance.name} ({self.broker_type})"

    def is_expired(self):
    """Vérifie si le verrou est expiré"""
    return timezone.now() > self.expires_at

    class BeatTaskExecution(models.Model):
    """Modèle pour tracer l'exécution des tâches"""
    task_name = models.CharField(max_length=255)
    instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
    started_at = models.DateTimeField(auto_now_add=True)
    completed_at = models.DateTimeField(null=True, blank=True)
    status = models.CharField(
    max_length=20,
    choices=[
    ('running', 'En cours'),
    ('completed', 'Terminé'),
    ('failed', 'Échoué'),
    ('skipped', 'Ignoré'),
    ],
    default='running'
    )
    result = models.TextField(blank=True)
    error_message = models.TextField(blank=True)
    execution_time = models.FloatField(null=True, blank=True)
    broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])

    class Meta:
    verbose_name = "Exécution de Tâche Beat"
    verbose_name_plural = "Exécutions de Tâches Beat"
    ordering = ['-started_at']

    def __str__(self):
    return f"{self.task_name} - {self.instance.name} ({self.broker_type})"

    def mark_completed(self, result=None):
    """Marque la tâche comme terminée"""
    self.status = 'completed'
    self.completed_at = timezone.now()
    if result:
    self.result = str(result)

    if self.started_at:
    execution_time = (self.completed_at - self.started_at).total_seconds()
    self.execution_time = execution_time

    self.save()

    def mark_failed(self, error_message=None):
    """Marque la tâche comme échouée"""
    self.status = 'failed'
    self.completed_at = timezone.now()
    if error_message:
    self.error_message = error_message

    self.save()

    🔴 Configuration Redis Détaillée

    Settings Redis

    # my_project/settings/redis.py
    from decouple import config
    import os

    # Configuration Redis
    REDIS_HOST = config('REDIS_HOST', default='localhost')
    REDIS_PORT = config('REDIS_PORT', default=6379)
    REDIS_PASSWORD = config('REDIS_PASSWORD', default='')
    REDIS_DB = config('REDIS_DB', default=0)

    # URLs Redis
    REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
    if REDIS_PASSWORD:
    REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"

    # Configuration Celery avec Redis
    CELERY_BROKER_URL = REDIS_URL
    CELERY_RESULT_BACKEND = REDIS_URL

    # Configuration Redis pour les instances Beat
    REDIS_BEAT_CONFIG = {
    'instance_a': {
    'database': 1, # DB séparée pour Instance A
    'key_prefix': 'beat_a:',
    'lock_timeout': 300, # 5 minutes
    'heartbeat_interval': 30, # 30 secondes
    },
    'instance_b': {
    'database': 2, # DB séparée pour Instance B
    'key_prefix': 'beat_b:',
    'lock_timeout': 600, # 10 minutes
    'heartbeat_interval': 60, # 1 minute
    },
    }

    # Configuration des queues Redis
    CELERY_TASK_ROUTES = {
    'apps.celery_app.tasks.health_check': {'queue': 'instance_a_queue'},
    'apps.celery_app.tasks.system_monitor': {'queue': 'instance_a_queue'},
    'apps.celery_app.tasks.critical_alerts': {'queue': 'instance_a_queue'},
    'apps.celery_app.tasks.daily_cleanup': {'queue': 'instance_b_queue'},
    'apps.celery_app.tasks.weekly_reports': {'queue': 'instance_b_queue'},
    'apps.celery_app.tasks.data_export': {'queue': 'instance_b_queue'},
    }

    # Configuration des queues
    from kombu import Queue
    CELERY_TASK_QUEUES = (
    Queue('instance_a_queue', routing_key='instance_a'),
    Queue('instance_b_queue', routing_key='instance_b'),
    Queue('default', routing_key='default'),
    )

    # Configuration Beat avec Redis
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
    CELERY_BEAT_SYNC_EVERY = 1

    # Configuration Redis pour la persistance
    REDIS_BEAT_SCHEDULE = {
    'instance_a': {
    'health-check-instance-a': {
    'task': 'apps.celery_app.tasks.health_check',
    'schedule': 30.0,
    'options': {'queue': 'instance_a_queue'}
    },
    'system-monitor-instance-a': {
    'task': 'apps.celery_app.tasks.system_monitor',
    'schedule': 60.0,
    'options': {'queue': 'instance_a_queue'}
    },
    'critical-alerts-instance-a': {
    'task': 'apps.celery_app.tasks.critical_alerts',
    'schedule': 120.0,
    'options': {'queue': 'instance_a_queue'}
    },
    },
    'instance_b': {
    'daily-cleanup-instance-b': {
    'task': 'apps.celery_app.tasks.daily_cleanup',
    'schedule': crontab(hour=2, minute=0),
    'options': {'queue': 'instance_b_queue'}
    },
    'weekly-reports-instance-b': {
    'task': 'apps.celery_app.tasks.weekly_reports',
    'schedule': crontab(hour=6, minute=0, day_of_week=1),
    'options': {'queue': 'instance_b_queue'}
    },
    'data-export-instance-b': {
    'task': 'apps.celery_app.tasks.data_export',
    'schedule': crontab(hour=4, minute=0, day_of_week=0),
    'options': {'queue': 'instance_b_queue'}
    },
    },
    }

    # my_project/settings/redis.py
    from decouple import config
    import os

    # Configuration Redis
    REDIS_HOST = config('REDIS_HOST', default='localhost')
    REDIS_PORT = config('REDIS_PORT', default=6379)
    REDIS_PASSWORD = config('REDIS_PASSWORD', default='')
    REDIS_DB = config('REDIS_DB', default=0)

    # URLs Redis
    REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
    if REDIS_PASSWORD:
    REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"

    # Configuration Celery avec Redis
    CELERY_BROKER_URL = REDIS_URL
    CELERY_RESULT_BACKEND = REDIS_URL

    # Configuration Redis pour les instances Beat
    REDIS_BEAT_CONFIG = {
    'instance_a': {
    'database': 1, # DB séparée pour Instance A
    'key_prefix': 'beat_a:',
    'lock_timeout': 300, # 5 minutes
    'heartbeat_interval': 30, # 30 secondes
    },
    'instance_b': {
    'database': 2, # DB séparée pour Instance B
    'key_prefix': 'beat_b:',
    'lock_timeout': 600, # 10 minutes
    'heartbeat_interval': 60, # 1 minute
    },
    }

    # Configuration des queues Redis
    CELERY_TASK_ROUTES = {
    'apps.celery_app.tasks.health_check': {'queue': 'instance_a_queue'},
    'apps.celery_app.tasks.system_monitor': {'queue': 'instance_a_queue'},
    'apps.celery_app.tasks.critical_alerts': {'queue': 'instance_a_queue'},
    'apps.celery_app.tasks.daily_cleanup': {'queue': 'instance_b_queue'},
    'apps.celery_app.tasks.weekly_reports': {'queue': 'instance_b_queue'},
    'apps.celery_app.tasks.data_export': {'queue': 'instance_b_queue'},
    }

    # Configuration des queues
    from kombu import Queue
    CELERY_TASK_QUEUES = (
    Queue('instance_a_queue', routing_key='instance_a'),
    Queue('instance_b_queue', routing_key='instance_b'),
    Queue('default', routing_key='default'),
    )

    # Configuration Beat avec Redis
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
    CELERY_BEAT_SYNC_EVERY = 1

    # Configuration Redis pour la persistance
    REDIS_BEAT_SCHEDULE = {
    'instance_a': {
    'health-check-instance-a': {
    'task': 'apps.celery_app.tasks.health_check',
    'schedule': 30.0,
    'options': {'queue': 'instance_a_queue'}
    },
    'system-monitor-instance-a': {
    'task': 'apps.celery_app.tasks.system_monitor',
    'schedule': 60.0,
    'options': {'queue': 'instance_a_queue'}
    },
    'critical-alerts-instance-a': {
    'task': 'apps.celery_app.tasks.critical_alerts',
    'schedule': 120.0,
    'options': {'queue': 'instance_a_queue'}
    },
    },
    'instance_b': {
    'daily-cleanup-instance-b': {
    'task': 'apps.celery_app.tasks.daily_cleanup',
    'schedule': crontab(hour=2, minute=0),
    'options': {'queue': 'instance_b_queue'}
    },
    'weekly-reports-instance-b': {
    'task': 'apps.celery_app.tasks.weekly_reports',
    'schedule': crontab(hour=6, minute=0, day_of_week=1),
    'options': {'queue': 'instance_b_queue'}
    },
    'data-export-instance-b': {
    'task': 'apps.celery_app.tasks.data_export',
    'schedule': crontab(hour=4, minute=0, day_of_week=0),
    'options': {'queue': 'instance_b_queue'}
    },
    },
    }

    Service de Synchronisation Redis

    # apps/celery_app/redis_sync.py
    import redis
    import json
    import time
    from django.conf import settings
    from django.utils import timezone
    from .models import BeatInstance, BeatTaskLock
    import logging

    logger = logging.getLogger(__name__)

    class RedisBeatSyncService:
    """Service de synchronisation Beat avec Redis"""

    def __init__(self, instance_name, instance_type):
    self.instance_name = instance_name
    self.instance_type = instance_type
    self.config = settings.REDIS_BEAT_CONFIG.get(instance_type, {})

    # Configuration Redis spécifique à l'instance
    db = self.config.get('database', 0)
    self.redis_url = f"{settings.REDIS_URL.split('/')[0]}/{db}"
    self.redis_client = redis.from_url(self.redis_url)

    self.key_prefix = self.config.get('key_prefix', 'beat:')
    self.lock_timeout = self.config.get('lock_timeout', 300)
    self.heartbeat_interval = self.config.get('heartbeat_interval', 60)

    def register_instance(self):
    """Enregistre l'instance Beat dans Redis"""
    instance_data = {
    'name': self.instance_name,
    'type': self.instance_type,
    'broker': 'redis',
    'registered_at': timezone.now().isoformat(),
    'last_heartbeat': timezone.now().isoformat(),
    'status': 'active',
    'config': self.config
    }

    # Sauvegarder dans Redis avec expiration
    redis_key = f"{self.key_prefix}instance"
    self.redis_client.setex(
    redis_key,
    self.lock_timeout,
    json.dumps(instance_data)
    )

    # Ajouter à la liste des instances actives
    self.redis_client.sadd(f"{self.key_prefix}active_instances", self.instance_name)

    # Créer ou mettre à jour dans Django
    instance, created = BeatInstance.objects.get_or_create(
    name=self.instance_name,
    defaults={
    'instance_type': self.instance_type,
    'queue_name': f"{self.instance_type}_queue",
    'broker_type': 'redis',
    'config_data': self.config
    }
    )

    if not created:
    instance.last_heartbeat = timezone.now()
    instance.broker_type = 'redis'
    instance.save()

    logger.info(f"Instance Redis {self.instance_name} enregistrée")

    def update_heartbeat(self):
    """Met à jour le heartbeat de l'instance"""
    redis_key = f"{self.key_prefix}instance"
    instance_data = self.redis_client.get(redis_key)

    if instance_data:
    data = json.loads(instance_data)
    data['last_heartbeat'] = timezone.now().isoformat()

    # Renouveler avec expiration
    self.redis_client.setex(redis_key, self.lock_timeout, json.dumps(data))

    # Mettre à jour dans Django
    try:
    instance = BeatInstance.objects.get(name=self.instance_name)
    instance.last_heartbeat = timezone.now()
    instance.save(update_fields=['last_heartbeat'])
    except BeatInstance.DoesNotExist:
    self.register_instance()

    def acquire_task_lock(self, task_name, timeout=None):
    """Acquiert un verrou pour une tâche"""
    if timeout is None:
    timeout = self.lock_timeout

    lock_key = f"{self.key_prefix}lock:{task_name}"
    lock_value = json.dumps({
    'instance': self.instance_name,
    'type': self.instance_type,
    'broker': 'redis',
    'acquired_at': timezone.now().isoformat(),
    'timeout': timeout
    })

    # Essayer d'acquérir le verrou avec SET NX EX
    result = self.redis_client.set(
    lock_key,
    lock_value,
    nx=True, # Ne définir que si la clé n'existe pas
    ex=timeout # Expiration en secondes
    )

    if result:
    # Enregistrer le verrou dans Django
    try:
    instance = BeatInstance.objects.get(name=self.instance_name)
    BeatTaskLock.objects.create(
    task_name=task_name,
    instance=instance,
    lock_key=lock_key,
    broker_type='redis',
    expires_at=timezone.now() + timezone.timedelta(seconds=timeout)
    )
    except BeatInstance.DoesNotExist:
    pass

    logger.info(f"Verrou Redis acquis par {self.instance_name} pour {task_name}")
    return True
    else:
    logger.info(f"Verrou Redis déjà tenu pour {task_name}")
    return False

    def release_task_lock(self, task_name):
    """Libère un verrou pour une tâche"""
    lock_key = f"{self.key_prefix}lock:{task_name}"

    # Script Lua pour libérer seulement si c'est notre verrou
    lua_script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
    else
    return 0
    end
    """

    lock_value = json.dumps({
    'instance': self.instance_name,
    'type': self.instance_type,
    'broker': 'redis',
    'acquired_at': timezone.now().isoformat(),
    'timeout': self.lock_timeout
    })

    result = self.redis_client.eval(lua_script, 1, lock_key, lock_value)

    if result:
    # Supprimer le verrou de Django
    BeatTaskLock.objects.filter(
    lock_key=lock_key,
    instance__name=self.instance_name
    ).delete()

    logger.info(f"Verrou Redis libéré par {self.instance_name} pour {task_name}")
    return True
    else:
    logger.warning(f"Impossible de libérer le verrou Redis {task_name}")
    return False

    def get_active_instances(self):
    """Récupère la liste des instances actives"""
    instances = []
    active_instance_names = self.redis_client.smembers(f"{self.key_prefix}active_instances")

    for instance_name in active_instance_names:
    instance_key = f"{self.key_prefix}instance"
    instance_data = self.redis_client.get(instance_key)
    if instance_data:
    instances.append(json.loads(instance_data))

    return instances

    def get_task_locks(self):
    """Récupère tous les verrous de tâches actifs"""
    locks = []
    lock_pattern = f"{self.key_prefix}lock:*"

    for lock_key in self.redis_client.scan_iter(match=lock_pattern):
    lock_data = self.redis_client.get(lock_key)
    if lock_data:
    lock_info = json.loads(lock_data)
    lock_info['lock_key'] = lock_key.decode('utf-8')
    locks.append(lock_info)

    return locks

    def cleanup_expired_locks(self):
    """Nettoie les verrous expirés"""
    lock_pattern = f"{self.key_prefix}lock:*"
    cleaned_count = 0

    for lock_key in self.redis_client.scan_iter(match=lock_pattern):
    ttl = self.redis_client.ttl(lock_key)
    if ttl == -1: # Pas d'expiration définie
    self.redis_client.delete(lock_key)
    cleaned_count += 1
    elif ttl == -2: # Clé expirée
    cleaned_count += 1

    logger.info(f"Nettoyage Redis: {cleaned_count} verrous expirés supprimés")
    return cleaned_count

    def get_instance_stats(self):
    """Récupère les statistiques de l'instance"""
    stats = {
    'instance_name': self.instance_name,
    'instance_type': self.instance_type,
    'broker': 'redis',
    'active_locks': len(self.get_task_locks()),
    'config': self.config,
    'redis_info': self.redis_client.info(),
    }

    return stats

    # apps/celery_app/redis_sync.py import redis import json import time from django.conf import settings from django.utils import timezone from .models import BeatInstance, BeatTaskLock import logginglogger = logging.getLogger(__name__)class RedisBeatSyncService: """Service de synchronisation Beat avec Redis""" def __init__(self, instance_name, instance_type): self.instance_name = instance_name self.instance_type = instance_type self.config = settings.REDIS_BEAT_CONFIG.get(instance_type, {}) # Configuration Redis spécifique à l'instance db = self.config.get('database', 0) self.redis_url = f"{settings.REDIS_URL.split('/')[0]}/{db}" self.redis_client = redis.from_url(self.redis_url) self.key_prefix = self.config.get('key_prefix', 'beat:') self.lock_timeout = self.config.get('lock_timeout', 300) self.heartbeat_interval = self.config.get('heartbeat_interval', 60) def register_instance(self): """Enregistre l'instance Beat dans Redis""" instance_data = { 'name': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'registered_at': timezone.now().isoformat(), 'last_heartbeat': timezone.now().isoformat(), 'status': 'active', 'config': self.config } # Sauvegarder dans Redis avec expiration redis_key = f"{self.key_prefix}instance" self.redis_client.setex( redis_key, self.lock_timeout, json.dumps(instance_data) ) # Ajouter à la liste des instances actives self.redis_client.sadd(f"{self.key_prefix}active_instances", self.instance_name) # Créer ou mettre à jour dans Django instance, created = BeatInstance.objects.get_or_create( name=self.instance_name, defaults={ 'instance_type': self.instance_type, 'queue_name': f"{self.instance_type}_queue", 'broker_type': 'redis', 'config_data': self.config } ) if not created: instance.last_heartbeat = timezone.now() instance.broker_type = 'redis' instance.save() logger.info(f"Instance Redis {self.instance_name} enregistrée") def update_heartbeat(self): """Met à jour le heartbeat de l'instance""" redis_key = f"{self.key_prefix}instance" instance_data = self.redis_client.get(redis_key) if instance_data: data = json.loads(instance_data) data['last_heartbeat'] = timezone.now().isoformat() # Renouveler avec expiration self.redis_client.setex(redis_key, self.lock_timeout, json.dumps(data)) # Mettre à jour dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) instance.last_heartbeat = timezone.now() instance.save(update_fields=['last_heartbeat']) except BeatInstance.DoesNotExist: self.register_instance() def acquire_task_lock(self, task_name, timeout=None): """Acquiert un verrou pour une tâche""" if timeout is
    Page 1 sur 2 (4319 mots au total)
    50% lu

    Auteur: Mvondo bekey anael

    ceo | founder
    COMPETENCES
    • • Maitrise des environnements Unix, Linux, Mac, Windows, IOS et Android
    • • Programmation Orientée Objet : Python, Ruby, C++, Java, PHP, Js
    • • Certifier linux Lpi (101, 102, 202, 203, 301, 303)
    • • Certifier Adwords Professional , Certifier Ceh (6, 7, 8, 9, 10, 11)
    • • Maîtrise parfaite de : Docker, VMware sphère, Microsoft Hyper, Citrix, Virtual box,promox vm
    Annonce