Escalabilidad y Gestión de Tareas
Para manejar cargas elevadas de trabajo sin bloquear la experiencia de usuario (por ejemplo, emisión de PDFs, envíos de WhatsApp o reintentos de cobros), Yastubo utiliza una arquitectura de ejecución asíncrona basada en colas de Redis.
Core Benefits / Key Features
Section titled “Core Benefits / Key Features”- Procesamiento No Bloqueante: Las tareas pesadas se delegan a workers fuera del ciclo de vida de la petición HTTP.
- Durable Execution: Las tareas se reintentan automáticamente en caso de fallo temporal.
- Escalado Independiente: El número de workers puede aumentarse horizontalmente sin afectar a la API principal.
- Caché Distribuida: Redis se utiliza para reducir la carga en la base de datos principal.
Deep Dive Técnico: arq Workers
Section titled “Deep Dive Técnico: arq Workers”Utilizamos arq, una librería de colas ultraligera basada en Redis para Python. La configuración de los workers se define en app/workers/worker.py, donde se registran funciones normales y trabajos programados (cron jobs).
Gestión de Colas
Section titled “Gestión de Colas”Cada vez que la API necesita realizar una tarea asíncrona, encola una “tarea” en Redis. El contenedor worker (o múltiples réplicas del mismo) consume estas tareas secuencialmente.
Tareas Programadas (Cron Jobs)
Section titled “Tareas Programadas (Cron Jobs)”El sistema incluye procesos recurrentes críticos para el negocio:
- Recordatorios de Pago: Ejecución diaria a las 9 AM.
- Reintentos de Stripe: Ejecución cada 6 horas para capturar pagos fallidos.
- Limpieza de Leads: Monitoreo de carritos abandonados.
Ejemplo Práctico: Implementación de Tarea Asíncrona
Section titled “Ejemplo Práctico: Implementación de Tarea Asíncrona”A continuación, se muestra cómo se define y encola una tarea de procesamiento de siniestros.
async def process_approved_claim(ctx, claim_id: int): """ Procesa un siniestro aprobado: genera pagos y notifica al cliente. Lógica técnica: utiliza el ctx['session_factory'] para persistir cambios. """ async with ctx['session_factory']() as db: # Lógica de procesamiento... logger.info(f"Siniestro {claim_id} procesado con éxito")
# app/modules/claims/service.py (Invocación desde la API)from app.core.redis import pool
async def approve_claim(db, claim_id): # ... validaciones ... # Encolar tarea asíncrona await pool.enqueue_job('process_approved_claim', claim_id)Diagrama de Escalado Horizontal
Section titled “Diagrama de Escalado Horizontal”[FLOW: La “API” (Múltiples réplicas) recibe peticiones y escribe tareas en “Redis Cluster”. Múltiples contenedores “Worker” (Réplicas 1..N) consumen de forma competitiva las tareas de la cola. Si un worker falla, la tarea permanece en Redis (o se reintenta según configuración) garantizando la durabilidad].
Optimización de Caché
Section titled “Optimización de Caché”Redis no solo actúa como cola, sino también como capa de caché para objetos de acceso frecuente, como la configuración de planes de seguros o los recargos por país, reduciendo la latencia de respuesta en un 40%.