📝 Introduction : Ce guide détaillé vous explique comment créer, configurer et exécuter plusieurs instances de Celery Beat (Instance A et Instance B) dans un même projet Django. Nous verrons comment les synchroniser via Redis/RabbitMQ et éviter les conflits d'exécution.
📝 Introduction : Ce guide détaillé vous explique comment créer, configurer et exécuter plusieurs instances de Celery Beat (Instance A et Instance B) dans un même projet Django. Nous verrons comment les synchroniser via Redis/RabbitMQ et éviter les conflits d'exécution.
📋 Table des Matières
- Installation et Configuration
- Structure du Projet Django
- Modèles Django pour la Gestion
- Configuration Instance A (Haute Priorité)
- Configuration Instance B (Basse Priorité)
- Configuration Redis Détaillée
- Configuration RabbitMQ Détaillée
- Synchronisation et Verrouillage
- Exécution et Déploiement
- Monitoring et Gestion
📦 Installation et Configuration
Dépendances Requises
# requirements.txt
Django==4.2.7
celery==5.3.4
django-celery-beat==2.5.0
redis==5.0.1
kombu==5.3.4
pika==1.3.2
python-decouple==3.8
psycopg2-binary==2.9.9
supervisor==4.2.5
# requirements.txt
Django==4.2.7
celery==5.3.4
django-celery-beat==2.5.0
redis==5.0.1
kombu==5.3.4
pika==1.3.2
python-decouple==3.8
psycopg2-binary==2.9.9
supervisor==4.2.5
Installation des Paquets
# Installation des dépendances
pip install -r requirements.txt
# Installation Redis
sudo apt update
sudo apt install redis-server
sudo systemctl start redis-server
sudo systemctl enable redis-server
# Installation RabbitMQ
sudo apt install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
# Installation des dépendances
pip install -r requirements.txt
# Installation Redis
sudo apt update
sudo apt install redis-server
sudo systemctl start redis-server
sudo systemctl enable redis-server
# Installation RabbitMQ
sudo apt install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
🏗️ Structure du Projet Django
my_django_project/
├── manage.py
├── requirements.txt
├── .env
├── my_project/
│ ├── __init__.py
│ ├── settings/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── redis.py
│ │ ├── rabbitmq.py
│ │ └── production.py
│ ├── urls.py
│ └── wsgi.py
├── apps/
│ ├── __init__.py
│ ├── celery_app/
│ │ ├── __init__.py
│ │ ├── celery.py
│ │ ├── models.py
│ │ ├── tasks.py
│ │ ├── admin.py
│ │ ├── redis_sync.py
│ │ ├── rabbitmq_sync.py
│ │ └── management/
│ │ └── commands/
│ │ ├── __init__.py
│ │ ├── start_beat_instance.py
│ │ ├── manage_instances.py
│ │ └── test_sync.py
│ └── notifications/
│ ├── models.py
│ └── tasks.py
├── static/
├── media/
├── logs/
├── scripts/
│ ├── start_instance_a.sh
│ ├── start_instance_b.sh
│ └── monitor_instances.sh
└── docker-compose.yml
my_django_project/
├── manage.py
├── requirements.txt
├── .env
├── my_project/
│ ├── __init__.py
│ ├── settings/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── redis.py
│ │ ├── rabbitmq.py
│ │ └── production.py
│ ├── urls.py
│ └── wsgi.py
├── apps/
│ ├── __init__.py
│ ├── celery_app/
│ │ ├── __init__.py
│ │ ├── celery.py
│ │ ├── models.py
│ │ ├── tasks.py
│ │ ├── admin.py
│ │ ├── redis_sync.py
│ │ ├── rabbitmq_sync.py
│ │ └── management/
│ │ └── commands/
│ │ ├── __init__.py
│ │ ├── start_beat_instance.py
│ │ ├── manage_instances.py
│ │ └── test_sync.py
│ └── notifications/
│ ├── models.py
│ └── tasks.py
├── static/
├── media/
├── logs/
├── scripts/
│ ├── start_instance_a.sh
│ ├── start_instance_b.sh
│ └── monitor_instances.sh
└── docker-compose.yml
🗄️ Modèles Django pour la Gestion
Modèle Instance Beat
# apps/celery_app/models.py
from django.db import models
from django.utils import timezone
import uuid
import json
class BeatInstance(models.Model):
"""Modèle pour gérer les instances Beat"""
INSTANCE_TYPES = [
('instance_a', 'Instance A - Haute Priorité'),
('instance_b', 'Instance B - Basse Priorité'),
]
STATUS_CHOICES = [
('active', 'Active'),
('inactive', 'Inactive'),
('error', 'Erreur'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
instance_type = models.CharField(max_length=20, choices=INSTANCE_TYPES)
queue_name = models.CharField(max_length=50)
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='active')
broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')], default='redis')
last_heartbeat = models.DateTimeField(default=timezone.now)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
config_data = models.JSONField(default=dict, blank=True)
class Meta:
verbose_name = "Instance Beat"
verbose_name_plural = "Instances Beat"
ordering = ['-last_heartbeat']
def __str__(self):
return f"{self.name} ({self.instance_type}) - {self.broker_type}"
def is_online(self):
"""Vérifie si l'instance est en ligne"""
time_diff = timezone.now() - self.last_heartbeat
return time_diff.total_seconds() < 300 # 5 minutes
def get_config(self):
"""Retourne la configuration de l'instance"""
return self.config_data
def set_config(self, config_dict):
"""Définit la configuration de l'instance"""
self.config_data = config_dict
self.save(update_fields=['config_data'])
class BeatTaskLock(models.Model):
"""Modèle pour gérer les verrous des tâches"""
task_name = models.CharField(max_length=255)
instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
lock_key = models.CharField(max_length=100, unique=True)
broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])
acquired_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField()
is_active = models.BooleanField(default=True)
class Meta:
verbose_name = "Verrou de Tâche Beat"
verbose_name_plural = "Verrous de Tâches Beat"
indexes = [
models.Index(fields=['lock_key', 'expires_at']),
models.Index(fields=['instance', 'is_active']),
models.Index(fields=['broker_type']),
]
def __str__(self):
return f"{self.task_name} - {self.instance.name} ({self.broker_type})"
def is_expired(self):
"""Vérifie si le verrou est expiré"""
return timezone.now() > self.expires_at
class BeatTaskExecution(models.Model):
"""Modèle pour tracer l'exécution des tâches"""
task_name = models.CharField(max_length=255)
instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
status = models.CharField(
max_length=20,
choices=[
('running', 'En cours'),
('completed', 'Terminé'),
('failed', 'Échoué'),
('skipped', 'Ignoré'),
],
default='running'
)
result = models.TextField(blank=True)
error_message = models.TextField(blank=True)
execution_time = models.FloatField(null=True, blank=True)
broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])
class Meta:
verbose_name = "Exécution de Tâche Beat"
verbose_name_plural = "Exécutions de Tâches Beat"
ordering = ['-started_at']
def __str__(self):
return f"{self.task_name} - {self.instance.name} ({self.broker_type})"
def mark_completed(self, result=None):
"""Marque la tâche comme terminée"""
self.status = 'completed'
self.completed_at = timezone.now()
if result:
self.result = str(result)
if self.started_at:
execution_time = (self.completed_at - self.started_at).total_seconds()
self.execution_time = execution_time
self.save()
def mark_failed(self, error_message=None):
"""Marque la tâche comme échouée"""
self.status = 'failed'
self.completed_at = timezone.now()
if error_message:
self.error_message = error_message
self.save()
# apps/celery_app/models.py
from django.db import models
from django.utils import timezone
import uuid
import json
class BeatInstance(models.Model):
"""Modèle pour gérer les instances Beat"""
INSTANCE_TYPES = [
('instance_a', 'Instance A - Haute Priorité'),
('instance_b', 'Instance B - Basse Priorité'),
]
STATUS_CHOICES = [
('active', 'Active'),
('inactive', 'Inactive'),
('error', 'Erreur'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
instance_type = models.CharField(max_length=20, choices=INSTANCE_TYPES)
queue_name = models.CharField(max_length=50)
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='active')
broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')], default='redis')
last_heartbeat = models.DateTimeField(default=timezone.now)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
config_data = models.JSONField(default=dict, blank=True)
class Meta:
verbose_name = "Instance Beat"
verbose_name_plural = "Instances Beat"
ordering = ['-last_heartbeat']
def __str__(self):
return f"{self.name} ({self.instance_type}) - {self.broker_type}"
def is_online(self):
"""Vérifie si l'instance est en ligne"""
time_diff = timezone.now() - self.last_heartbeat
return time_diff.total_seconds() < 300 # 5 minutes
def get_config(self):
"""Retourne la configuration de l'instance"""
return self.config_data
def set_config(self, config_dict):
"""Définit la configuration de l'instance"""
self.config_data = config_dict
self.save(update_fields=['config_data'])
class BeatTaskLock(models.Model):
"""Modèle pour gérer les verrous des tâches"""
task_name = models.CharField(max_length=255)
instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
lock_key = models.CharField(max_length=100, unique=True)
broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])
acquired_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField()
is_active = models.BooleanField(default=True)
class Meta:
verbose_name = "Verrou de Tâche Beat"
verbose_name_plural = "Verrous de Tâches Beat"
indexes = [
models.Index(fields=['lock_key', 'expires_at']),
models.Index(fields=['instance', 'is_active']),
models.Index(fields=['broker_type']),
]
def __str__(self):
return f"{self.task_name} - {self.instance.name} ({self.broker_type})"
def is_expired(self):
"""Vérifie si le verrou est expiré"""
return timezone.now() > self.expires_at
class BeatTaskExecution(models.Model):
"""Modèle pour tracer l'exécution des tâches"""
task_name = models.CharField(max_length=255)
instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE)
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
status = models.CharField(
max_length=20,
choices=[
('running', 'En cours'),
('completed', 'Terminé'),
('failed', 'Échoué'),
('skipped', 'Ignoré'),
],
default='running'
)
result = models.TextField(blank=True)
error_message = models.TextField(blank=True)
execution_time = models.FloatField(null=True, blank=True)
broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')])
class Meta:
verbose_name = "Exécution de Tâche Beat"
verbose_name_plural = "Exécutions de Tâches Beat"
ordering = ['-started_at']
def __str__(self):
return f"{self.task_name} - {self.instance.name} ({self.broker_type})"
def mark_completed(self, result=None):
"""Marque la tâche comme terminée"""
self.status = 'completed'
self.completed_at = timezone.now()
if result:
self.result = str(result)
if self.started_at:
execution_time = (self.completed_at - self.started_at).total_seconds()
self.execution_time = execution_time
self.save()
def mark_failed(self, error_message=None):
"""Marque la tâche comme échouée"""
self.status = 'failed'
self.completed_at = timezone.now()
if error_message:
self.error_message = error_message
self.save()
🔴 Configuration Redis Détaillée
Settings Redis
# my_project/settings/redis.py
from decouple import config
import os
# Configuration Redis
REDIS_HOST = config('REDIS_HOST', default='localhost')
REDIS_PORT = config('REDIS_PORT', default=6379)
REDIS_PASSWORD = config('REDIS_PASSWORD', default='')
REDIS_DB = config('REDIS_DB', default=0)
# URLs Redis
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
if REDIS_PASSWORD:
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
# Configuration Celery avec Redis
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
# Configuration Redis pour les instances Beat
REDIS_BEAT_CONFIG = {
'instance_a': {
'database': 1, # DB séparée pour Instance A
'key_prefix': 'beat_a:',
'lock_timeout': 300, # 5 minutes
'heartbeat_interval': 30, # 30 secondes
},
'instance_b': {
'database': 2, # DB séparée pour Instance B
'key_prefix': 'beat_b:',
'lock_timeout': 600, # 10 minutes
'heartbeat_interval': 60, # 1 minute
},
}
# Configuration des queues Redis
CELERY_TASK_ROUTES = {
'apps.celery_app.tasks.health_check': {'queue': 'instance_a_queue'},
'apps.celery_app.tasks.system_monitor': {'queue': 'instance_a_queue'},
'apps.celery_app.tasks.critical_alerts': {'queue': 'instance_a_queue'},
'apps.celery_app.tasks.daily_cleanup': {'queue': 'instance_b_queue'},
'apps.celery_app.tasks.weekly_reports': {'queue': 'instance_b_queue'},
'apps.celery_app.tasks.data_export': {'queue': 'instance_b_queue'},
}
# Configuration des queues
from kombu import Queue
CELERY_TASK_QUEUES = (
Queue('instance_a_queue', routing_key='instance_a'),
Queue('instance_b_queue', routing_key='instance_b'),
Queue('default', routing_key='default'),
)
# Configuration Beat avec Redis
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BEAT_SYNC_EVERY = 1
# Configuration Redis pour la persistance
REDIS_BEAT_SCHEDULE = {
'instance_a': {
'health-check-instance-a': {
'task': 'apps.celery_app.tasks.health_check',
'schedule': 30.0,
'options': {'queue': 'instance_a_queue'}
},
'system-monitor-instance-a': {
'task': 'apps.celery_app.tasks.system_monitor',
'schedule': 60.0,
'options': {'queue': 'instance_a_queue'}
},
'critical-alerts-instance-a': {
'task': 'apps.celery_app.tasks.critical_alerts',
'schedule': 120.0,
'options': {'queue': 'instance_a_queue'}
},
},
'instance_b': {
'daily-cleanup-instance-b': {
'task': 'apps.celery_app.tasks.daily_cleanup',
'schedule': crontab(hour=2, minute=0),
'options': {'queue': 'instance_b_queue'}
},
'weekly-reports-instance-b': {
'task': 'apps.celery_app.tasks.weekly_reports',
'schedule': crontab(hour=6, minute=0, day_of_week=1),
'options': {'queue': 'instance_b_queue'}
},
'data-export-instance-b': {
'task': 'apps.celery_app.tasks.data_export',
'schedule': crontab(hour=4, minute=0, day_of_week=0),
'options': {'queue': 'instance_b_queue'}
},
},
}
# my_project/settings/redis.py
from decouple import config
import os
# Configuration Redis
REDIS_HOST = config('REDIS_HOST', default='localhost')
REDIS_PORT = config('REDIS_PORT', default=6379)
REDIS_PASSWORD = config('REDIS_PASSWORD', default='')
REDIS_DB = config('REDIS_DB', default=0)
# URLs Redis
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
if REDIS_PASSWORD:
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
# Configuration Celery avec Redis
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
# Configuration Redis pour les instances Beat
REDIS_BEAT_CONFIG = {
'instance_a': {
'database': 1, # DB séparée pour Instance A
'key_prefix': 'beat_a:',
'lock_timeout': 300, # 5 minutes
'heartbeat_interval': 30, # 30 secondes
},
'instance_b': {
'database': 2, # DB séparée pour Instance B
'key_prefix': 'beat_b:',
'lock_timeout': 600, # 10 minutes
'heartbeat_interval': 60, # 1 minute
},
}
# Configuration des queues Redis
CELERY_TASK_ROUTES = {
'apps.celery_app.tasks.health_check': {'queue': 'instance_a_queue'},
'apps.celery_app.tasks.system_monitor': {'queue': 'instance_a_queue'},
'apps.celery_app.tasks.critical_alerts': {'queue': 'instance_a_queue'},
'apps.celery_app.tasks.daily_cleanup': {'queue': 'instance_b_queue'},
'apps.celery_app.tasks.weekly_reports': {'queue': 'instance_b_queue'},
'apps.celery_app.tasks.data_export': {'queue': 'instance_b_queue'},
}
# Configuration des queues
from kombu import Queue
CELERY_TASK_QUEUES = (
Queue('instance_a_queue', routing_key='instance_a'),
Queue('instance_b_queue', routing_key='instance_b'),
Queue('default', routing_key='default'),
)
# Configuration Beat avec Redis
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BEAT_SYNC_EVERY = 1
# Configuration Redis pour la persistance
REDIS_BEAT_SCHEDULE = {
'instance_a': {
'health-check-instance-a': {
'task': 'apps.celery_app.tasks.health_check',
'schedule': 30.0,
'options': {'queue': 'instance_a_queue'}
},
'system-monitor-instance-a': {
'task': 'apps.celery_app.tasks.system_monitor',
'schedule': 60.0,
'options': {'queue': 'instance_a_queue'}
},
'critical-alerts-instance-a': {
'task': 'apps.celery_app.tasks.critical_alerts',
'schedule': 120.0,
'options': {'queue': 'instance_a_queue'}
},
},
'instance_b': {
'daily-cleanup-instance-b': {
'task': 'apps.celery_app.tasks.daily_cleanup',
'schedule': crontab(hour=2, minute=0),
'options': {'queue': 'instance_b_queue'}
},
'weekly-reports-instance-b': {
'task': 'apps.celery_app.tasks.weekly_reports',
'schedule': crontab(hour=6, minute=0, day_of_week=1),
'options': {'queue': 'instance_b_queue'}
},
'data-export-instance-b': {
'task': 'apps.celery_app.tasks.data_export',
'schedule': crontab(hour=4, minute=0, day_of_week=0),
'options': {'queue': 'instance_b_queue'}
},
},
}
Service de Synchronisation Redis
# apps/celery_app/redis_sync.py
import redis
import json
import time
from django.conf import settings
from django.utils import timezone
from .models import BeatInstance, BeatTaskLock
import logging
logger = logging.getLogger(__name__)
class RedisBeatSyncService:
"""Service de synchronisation Beat avec Redis"""
def __init__(self, instance_name, instance_type):
self.instance_name = instance_name
self.instance_type = instance_type
self.config = settings.REDIS_BEAT_CONFIG.get(instance_type, {})
# Configuration Redis spécifique à l'instance
db = self.config.get('database', 0)
self.redis_url = f"{settings.REDIS_URL.split('/')[0]}/{db}"
self.redis_client = redis.from_url(self.redis_url)
self.key_prefix = self.config.get('key_prefix', 'beat:')
self.lock_timeout = self.config.get('lock_timeout', 300)
self.heartbeat_interval = self.config.get('heartbeat_interval', 60)
def register_instance(self):
"""Enregistre l'instance Beat dans Redis"""
instance_data = {
'name': self.instance_name,
'type': self.instance_type,
'broker': 'redis',
'registered_at': timezone.now().isoformat(),
'last_heartbeat': timezone.now().isoformat(),
'status': 'active',
'config': self.config
}
# Sauvegarder dans Redis avec expiration
redis_key = f"{self.key_prefix}instance"
self.redis_client.setex(
redis_key,
self.lock_timeout,
json.dumps(instance_data)
)
# Ajouter à la liste des instances actives
self.redis_client.sadd(f"{self.key_prefix}active_instances", self.instance_name)
# Créer ou mettre à jour dans Django
instance, created = BeatInstance.objects.get_or_create(
name=self.instance_name,
defaults={
'instance_type': self.instance_type,
'queue_name': f"{self.instance_type}_queue",
'broker_type': 'redis',
'config_data': self.config
}
)
if not created:
instance.last_heartbeat = timezone.now()
instance.broker_type = 'redis'
instance.save()
logger.info(f"Instance Redis {self.instance_name} enregistrée")
def update_heartbeat(self):
"""Met à jour le heartbeat de l'instance"""
redis_key = f"{self.key_prefix}instance"
instance_data = self.redis_client.get(redis_key)
if instance_data:
data = json.loads(instance_data)
data['last_heartbeat'] = timezone.now().isoformat()
# Renouveler avec expiration
self.redis_client.setex(redis_key, self.lock_timeout, json.dumps(data))
# Mettre à jour dans Django
try:
instance = BeatInstance.objects.get(name=self.instance_name)
instance.last_heartbeat = timezone.now()
instance.save(update_fields=['last_heartbeat'])
except BeatInstance.DoesNotExist:
self.register_instance()
def acquire_task_lock(self, task_name, timeout=None):
"""Acquiert un verrou pour une tâche"""
if timeout is None:
timeout = self.lock_timeout
lock_key = f"{self.key_prefix}lock:{task_name}"
lock_value = json.dumps({
'instance': self.instance_name,
'type': self.instance_type,
'broker': 'redis',
'acquired_at': timezone.now().isoformat(),
'timeout': timeout
})
# Essayer d'acquérir le verrou avec SET NX EX
result = self.redis_client.set(
lock_key,
lock_value,
nx=True, # Ne définir que si la clé n'existe pas
ex=timeout # Expiration en secondes
)
if result:
# Enregistrer le verrou dans Django
try:
instance = BeatInstance.objects.get(name=self.instance_name)
BeatTaskLock.objects.create(
task_name=task_name,
instance=instance,
lock_key=lock_key,
broker_type='redis',
expires_at=timezone.now() + timezone.timedelta(seconds=timeout)
)
except BeatInstance.DoesNotExist:
pass
logger.info(f"Verrou Redis acquis par {self.instance_name} pour {task_name}")
return True
else:
logger.info(f"Verrou Redis déjà tenu pour {task_name}")
return False
def release_task_lock(self, task_name):
"""Libère un verrou pour une tâche"""
lock_key = f"{self.key_prefix}lock:{task_name}"
# Script Lua pour libérer seulement si c'est notre verrou
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
lock_value = json.dumps({
'instance': self.instance_name,
'type': self.instance_type,
'broker': 'redis',
'acquired_at': timezone.now().isoformat(),
'timeout': self.lock_timeout
})
result = self.redis_client.eval(lua_script, 1, lock_key, lock_value)
if result:
# Supprimer le verrou de Django
BeatTaskLock.objects.filter(
lock_key=lock_key,
instance__name=self.instance_name
).delete()
logger.info(f"Verrou Redis libéré par {self.instance_name} pour {task_name}")
return True
else:
logger.warning(f"Impossible de libérer le verrou Redis {task_name}")
return False
def get_active_instances(self):
"""Récupère la liste des instances actives"""
instances = []
active_instance_names = self.redis_client.smembers(f"{self.key_prefix}active_instances")
for instance_name in active_instance_names:
instance_key = f"{self.key_prefix}instance"
instance_data = self.redis_client.get(instance_key)
if instance_data:
instances.append(json.loads(instance_data))
return instances
def get_task_locks(self):
"""Récupère tous les verrous de tâches actifs"""
locks = []
lock_pattern = f"{self.key_prefix}lock:*"
for lock_key in self.redis_client.scan_iter(match=lock_pattern):
lock_data = self.redis_client.get(lock_key)
if lock_data:
lock_info = json.loads(lock_data)
lock_info['lock_key'] = lock_key.decode('utf-8')
locks.append(lock_info)
return locks
def cleanup_expired_locks(self):
"""Nettoie les verrous expirés"""
lock_pattern = f"{self.key_prefix}lock:*"
cleaned_count = 0
for lock_key in self.redis_client.scan_iter(match=lock_pattern):
ttl = self.redis_client.ttl(lock_key)
if ttl == -1: # Pas d'expiration définie
self.redis_client.delete(lock_key)
cleaned_count += 1
elif ttl == -2: # Clé expirée
cleaned_count += 1
logger.info(f"Nettoyage Redis: {cleaned_count} verrous expirés supprimés")
return cleaned_count
def get_instance_stats(self):
"""Récupère les statistiques de l'instance"""
stats = {
'instance_name': self.instance_name,
'instance_type': self.instance_type,
'broker': 'redis',
'active_locks': len(self.get_task_locks()),
'config': self.config,
'redis_info': self.redis_client.info(),
}
return stats
# apps/celery_app/redis_sync.py import redis import json import time from django.conf import settings from django.utils import timezone from .models import BeatInstance, BeatTaskLock import logginglogger = logging.getLogger(__name__)class RedisBeatSyncService: """Service de synchronisation Beat avec Redis""" def __init__(self, instance_name, instance_type): self.instance_name = instance_name self.instance_type = instance_type self.config = settings.REDIS_BEAT_CONFIG.get(instance_type, {}) # Configuration Redis spécifique à l'instance db = self.config.get('database', 0) self.redis_url = f"{settings.REDIS_URL.split('/')[0]}/{db}" self.redis_client = redis.from_url(self.redis_url) self.key_prefix = self.config.get('key_prefix', 'beat:') self.lock_timeout = self.config.get('lock_timeout', 300) self.heartbeat_interval = self.config.get('heartbeat_interval', 60) def register_instance(self): """Enregistre l'instance Beat dans Redis""" instance_data = { 'name': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'registered_at': timezone.now().isoformat(), 'last_heartbeat': timezone.now().isoformat(), 'status': 'active', 'config': self.config } # Sauvegarder dans Redis avec expiration redis_key = f"{self.key_prefix}instance" self.redis_client.setex( redis_key, self.lock_timeout, json.dumps(instance_data) ) # Ajouter à la liste des instances actives self.redis_client.sadd(f"{self.key_prefix}active_instances", self.instance_name) # Créer ou mettre à jour dans Django instance, created = BeatInstance.objects.get_or_create( name=self.instance_name, defaults={ 'instance_type': self.instance_type, 'queue_name': f"{self.instance_type}_queue", 'broker_type': 'redis', 'config_data': self.config } ) if not created: instance.last_heartbeat = timezone.now() instance.broker_type = 'redis' instance.save() logger.info(f"Instance Redis {self.instance_name} enregistrée") def update_heartbeat(self): """Met à jour le heartbeat de l'instance""" redis_key = f"{self.key_prefix}instance" instance_data = self.redis_client.get(redis_key) if instance_data: data = json.loads(instance_data) data['last_heartbeat'] = timezone.now().isoformat() # Renouveler avec expiration self.redis_client.setex(redis_key, self.lock_timeout, json.dumps(data)) # Mettre à jour dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) instance.last_heartbeat = timezone.now() instance.save(update_fields=['last_heartbeat']) except BeatInstance.DoesNotExist: self.register_instance() def acquire_task_lock(self, task_name, timeout=None): """Acquiert un verrou pour une tâche""" if timeout is