Mejores Prácticas para Comunicación entre Agentes de IA
Técnicas, protocolos y formatos para que tus agentes de IA se comuniquen de forma eficiente. Con ejemplos de código JSON, Python y patrones probados en producción.
En un sistema multi-agente, la comunicación no es un detalle menor: es el sistema circulatorio. Si los agentes no se comunican bien, el sistema entero falla. Mensajes perdidos, formatos incompatibles, timeouts mal configurados — cada uno de estos problemas puede tumbar un sistema en producción.
Esta guía recoge las mejores prácticas que hemos aprendido diseñando sistemas multi-agente en producción. Cubrimos desde el formato básico de un mensaje hasta patrones avanzados de comunicación asíncrona, con ejemplos de código que puedes implementar hoy.
Fundamentos de la comunicación entre agentes
Antes de hablar de formatos y protocolos, necesitamos entender los principios fundamentales que toda comunicación entre agentes debería seguir:
- Mensajes autónomos: Cada mensaje debe ser autocontenido. Quien lo recibe debe poder procesarlo sin necesidad de contexto externo.
- Identificación única: Cada mensaje necesita un ID único para tracking, debugging y deduplicación.
- Contrato explícito: Los agentes deben acordar un esquema de mensaje. Sin contrato, hay caos.
- Asincronía por defecto: Los agentes no deberían bloquearse esperando respuestas. La comunicación asíncrona es más resiliente.
- Manejo de errores: Los mensajes fallidos no se pierden. Van a una cola de revisión o se reintentan.
Estos principios aplican independientemente del formato o protocolo que uses. Violar cualquiera de ellos te traerá problemas a medida que el sistema crezca.
"La comunicación entre agentes es como un contrato entre servicios. Si no está bien definido, alguien va a incumplirlo."
Formatos de mensaje: JSON, Protocol Buffers y más
El formato del mensaje determina cómo se serializa, transmite y deserializa la información entre agentes. Cada formato tiene sus ventajas y desventajas:
| Formato | Tamaño | Velocidad | Legibilidad | Esquema | Ecosistema |
|---|---|---|---|---|---|
| JSON | Grande | Media | ⭐⭐⭐ Excelente | Opcional (JSON Schema) | Universal |
| Protocol Buffers | Pequeño | Alta | ⭐⭐ Baja (binario) | Obligatorio (.proto) | Multi-lenguaje |
| MessagePack | Pequeño | Alta | ⭐⭐ Media | Opcional | Multi-lenguaje |
| YAML | Grande | Baja | ⭐⭐⭐ Excelente | Opcional | Configuración |
| Markdown estructurado | Grande | Baja | ⭐⭐⭐ Excelente | No formal | LLMs |
Nuestra recomendación
Para el 90% de los casos, JSON es la opción correcta. Es legible, universal, fácil de debuggear y todas las herramientas lo soportan. Solo considera Protocol Buffers si necesitas rendimiento extremo o estás transmitiendo volúmenes masivos de datos.
Para mensajes que serán procesados por LLMs (no por código), el markdown estructurado puede ser una mejor opción, ya que los modelos lo entienden de forma natural. Pero para comunicación entre agentes y orquestadores, JSON es el estándar de facto.
Esquema de mensaje recomendado
Después de construir varios sistemas multi-agente, hemos convergido en este esquema de mensaje que cubre el 95% de los casos de uso:
{
"protocol_version": "1.0",
"message_id": "msg_2kf8d93jf4",
"correlation_id": "corr_7h3kf92",
// Quién envía y quién recibe
"source": {
"agent": "investigador_mercado",
"crew": "crew_analisis_mercado",
"instance_id": "inst_abc123"
},
"target": {
"agent": "analista_datos",
"crew": "crew_analisis_mercado"
},
// Metadatos de trazabilidad
"timestamp": "2026-06-21T15:30:00.123Z",
"ttl_seconds": 300,
"priority": 3,
// Tipo de mensaje y payload
"message_type": "task_request",
// task_request | task_result | task_error | heartbeat | log | control
"payload": {
"task_id": "task_789",
"type": "analyze_market",
"input": {
"market": "LATAM",
"sector": "fintech",
"year": 2026
},
"context": {
"session_id": "session_456",
"previous_results": ["result_task_788"]
}
},
// Estado y metadatos de ejecución
"status": {
"code": "pending",
// pending | processing | completed | failed | cancelled
"attempt": 1,
"max_attempts": 3
},
// Firma para verificar integridad
"signature": "sig_a1b2c3d4..."
}
Este esquema incluye todo lo necesario para un sistema robusto:
- Protocolo y versionado: Permite evolucionar el formato sin romper compatibilidad.
- Trazabilidad completa: message_id único + correlation_id para seguir cadenas de mensajes.
- Origen y destino explícitos: Cada mensaje sabe quién lo envía y quién debe recibirlo.
- Time-to-live: Los mensajes caducan automáticamente, evitando procesar información obsoleta.
- Tipado de mensaje: Diferentes tipos para diferentes propósitos (tareas, resultados, errores, heartbeats).
- Control de reintentos: El propio mensaje lleva la cuenta de intentos, evitando loops infinitos.
Patrones de comunicación
Más allá del formato del mensaje, necesitas decidir cómo se comunican los agentes. Aquí tienes los patrones más utilizados:
1. Request-Response (Síncrono)
En qué consiste: Un agente envía una solicitud y espera la respuesta.
Cuándo usarlo: Cuando necesitas una respuesta inmediata para continuar. Ideal para pipelines lineales.
Ejemplo: Agente A pide un análisis al Agente B, espera el resultado, y lo usa como entrada para su siguiente paso.
Implementación: Llamadas HTTP directas, gRPC, o funciones asíncronas con await.
2. Publish-Subscribe (Asíncrono)
En qué consiste: Un agente publica un mensaje y múltiples agentes suscritos lo reciben.
Cuándo usarlo: Cuando múltiples agentes necesitan reaccionar al mismo evento.
Ejemplo: Un agente detector de anomalías publica un evento "anomalía detectada" y tres agentes suscritos (alertas, logging, backup) reaccionan en paralelo.
Implementación: Redis Pub/Sub, RabbitMQ exchanges, Kafka topics.
3. Message Queue (Cola de trabajo)
En qué consiste: Los mensajes se encolan y los agentes los procesan cuando están disponibles.
Cuándo usarlo: Cuando hay desequilibrio entre la velocidad de producción y consumo de mensajes, o para distribuir carga entre agentes idénticos.
Ejemplo: 100 tareas de análisis encoladas, 3 agentes analistas las toman de la cola según su disponibilidad.
Implementación: Redis Lists, RabbitMQ queues, Amazon SQS.
4. Blackboard (Memoria compartida)
En qué consiste: Los agentes leen y escriben en un espacio de memoria compartido.
Cuándo usarlo: Cuando múltiples agentes necesitan acceder y modificar el mismo estado de forma concurrente.
Ejemplo: Un equipo de agentes trabajando en un documento colaborativo, cada uno aporta su sección.
Implementación: Redis (con locks optimistas), base de datos compartida, sistema de archivos.
| Patrón | Acoplamiento | Escalabilidad | Complejidad | Latencia |
|---|---|---|---|---|
| Request-Response | Alto | Baja | Baja | Baja |
| Pub-Sub | Bajo | Alta | Media | Media |
| Message Queue | Bajo | Muy alta | Media | Media-Alta |
| Blackboard | Medio | Media | Alta | Variable |
Colas de mensajes: Redis vs RabbitMQ vs Kafka
Para sistemas multi-agente que escalan, las colas de mensajes son prácticamente obligatorias. Aquí tienes las opciones más populares y cuándo usar cada una:
| Característica | Redis | RabbitMQ | Apache Kafka |
|---|---|---|---|
| Tipo | In-memory + persistencia opcional | Message broker clásico | Event streaming platform |
| Rendimiento | ~100K msg/s | ~50K msg/s | ~1M+ msg/s |
| Persistencia | ⚠️ Limitada | ✅ Alta | ✅ Muy alta |
| Complejidad setup | 🟢 Baja | 🟡 Media | 🔴 Alta |
| Casos de uso | Caching, colas simples, pub-sub ligero | Colas de trabajo, routing complejo | Event sourcing, streaming, logs |
| Ideal para agentes | 2-10 agentes, prototipos | 5-30 agentes, producción | 20+ agentes, alta escala |
Recomendación para startups
Para la mayoría de startups building sistemas multi-agente, Redis es el punto de partida ideal. Es simple de configurar, rápido y suficiente para sistemas de hasta 10-15 agentes. Cuando superes esa escala, migra a RabbitMQ o directamente a Kafka si necesitas event sourcing.
Si usas MakeYourCrew, no necesitas preocuparte por esto: la plataforma incluye colas de mensajes gestionadas que escalan automáticamente con tu sistema.
Implementación práctica en Python
Vamos a implementar un sistema de comunicación entre agentes usando Redis como cola de mensajes. Este es un patrón que funciona bien para equipos de 3-10 agentes.
import json
import uuid
from datetime import datetime, timezone
from typing import Optional
import redis.asyncio as redis
class MessageBus:
"""Sistema de comunicación entre agentes usando Redis."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.prefix = "agents:"
async def send(
self,
recipient: str,
sender: str,
message_type: str,
payload: dict,
ttl: int = 300
) -> str:
"""Envía un mensaje a un agente específico."""
message_id = f"msg_{uuid.uuid4().hex[:12]}"
message = {
"protocol_version": "1.0",
"message_id": message_id,
"source": {"agent": sender},
"target": {"agent": recipient},
"timestamp": datetime.now(timezone.utc).isoformat(),
"ttl_seconds": ttl,
"message_type": message_type,
"payload": payload,
"status": {"code": "pending", "attempt": 1, "max_attempts": 3}
}
queue_key = f"{self.prefix}queue:{recipient}"
message_key = f"{self.prefix}msg:{message_id}"
# Almacenar mensaje con TTL
await self.redis.setex(message_key, ttl, json.dumps(message))
# Encolar referencia al mensaje
await self.redis.lpush(queue_key, message_id)
return message_id
async def receive(self, agent_name: str, timeout: int = 5) -> Optional[dict]:
"""Recibe el siguiente mensaje para un agente (bloqueante)."""
queue_key = f"{self.prefix}queue:{agent_name}"
# Esperar por un mensaje con timeout
result = await self.redis.brpop(queue_key, timeout=timeout)
if result is None:
return None
_, message_id = result
message_key = f"{self.prefix}msg:{message_id}"
message_data = await self.redis.get(message_key)
if message_data is None:
return None # Mensaje expiró
message = json.loads(message_data)
message["status"]["code"] = "processing"
await self.redis.setex(message_key, message["ttl_seconds"],
json.dumps(message))
return message
async def respond(
self,
original_message: dict,
payload: dict,
status: str = "completed"
) -> str:
"""Responde al remitente de un mensaje."""
return await self.send(
recipient=original_message["source"]["agent"],
sender=original_message["target"]["agent"],
message_type="task_result" if status == "completed" else "task_error",
payload={
"task_id": original_message["payload"].get("task_id"),
"status": status,
"data": payload
},
ttl=original_message.get("ttl_seconds", 300)
)
async def publish(self, topic: str, sender: str, payload: dict):
"""Publica un mensaje en un canal pub-sub."""
channel = f"{self.prefix}pubsub:{topic}"
message = {
"source": {"agent": sender},
"timestamp": datetime.now(timezone.utc).isoformat(),
"payload": payload
}
await self.redis.publish(channel, json.dumps(message))
async def subscribe(self, topic: str, callback):
"""Se suscribe a un canal pub-sub."""
channel = f"{self.prefix}pubsub:{topic}"
pubsub = self.redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message["type"] == "message":
await callback(json.loads(message["data"]))
# --- Ejemplo de uso ---
async def ejemplo():
bus = MessageBus()
# Agente A envía una tarea al Agente B
msg_id = await bus.send(
recipient="agente_analista",
sender="agente_coordinador",
message_type="task_request",
payload={
"task_id": "task_001",
"type": "analyze_data",
"input": {"dataset": "ventas_2026", "metricas": ["total", "avg"]}
}
)
print(f"Mensaje enviado: {msg_id}")
# Agente B recibe y procesa
msg = await bus.receive("agente_analista")
if msg:
print(f"Procesando: {msg['payload']['task_id']}")
# ... procesar ...
await bus.respond(msg, {"resultado": "análisis completado"})
await bus.redis.close()
Gestión de errores y resiliencia
En comunicación entre agentes, los errores son inevitables. Lo importante es tener un plan para cuando ocurran. Aquí están las estrategias que usamos en producción:
1. Dead Letter Queue (DLQ)
Los mensajes que no se pueden procesar después de N reintentos van a una cola especial para revisión. Nunca se pierden, quedan disponibles para diagnóstico.
async def procesar_con_dlq(bus: MessageBus, agente, max_attempts: int = 3):
"""Procesa mensajes con dead letter queue."""
while True:
msg = await bus.receive(agente.nombre)
if msg is None:
continue
try:
resultado = await agente.procesar(msg["payload"])
await bus.respond(msg, resultado)
print(f"[{agente.nombre}] Tarea {msg['payload']['task_id']} completada")
except Exception as e:
attempt = msg["status"]["attempt"]
if attempt < max_attempts:
# Reintentar
msg["status"]["attempt"] = attempt + 1
print(f"[{agente.nombre}] Reintento {attempt + 1}/{max_attempts}")
await bus.redis.lpush(f"agents:queue:{agente.nombre}",
msg["message_id"])
else:
# Enviar a DLQ
dlq_key = f"agents:dlq:{agente.nombre}"
msg["status"]["code"] = "failed"
msg["status"]["error"] = str(e)
await bus.redis.lpush(dlq_key, json.dumps(msg))
print(f"[{agente.nombre}] Enviado a DLQ: {e}")
2. Timeouts en todas las comunicaciones
Cada mensaje debe tener un TTL. Si un agente no lo procesa a tiempo, se descarta o se reasigna. Esto evita que un agente congelado bloquee todo el sistema.
3. Health checks y heartbeats
Cada agente debe emitir heartbeats periódicos. Si un agente no envía heartbeat durante N segundos, el orquestador lo marca como caído y redistribuye sus tareas.
async def monitor_health(bus: MessageBus, agentes: list, timeout: int = 30):
"""Monitoriza la salud de los agentes mediante heartbeats."""
while True:
for agente in agentes:
last_heartbeat = await bus.redis.get(
f"agents:heartbeat:{agente.nombre}"
)
if last_heartbeat is None:
print(f"[MONITOR] {agente.nombre} no responde!")
# Reasignar tareas pendientes
await reasignar_tareas(bus, agente.nombre)
await asyncio.sleep(10)
4. Validación de esquemas
Usa JSON Schema para validar que los mensajes cumplen el contrato antes de procesarlos. Esto detecta errores de formato temprano y evita fallos crípticos más adelante.
MESSAGE_SCHEMA = {
"type": "object",
"required": ["protocol_version", "message_id", "source", "payload"],
"properties": {
"protocol_version": {"type": "string", "pattern": "^\\d+\\.\\d+$"},
"message_id": {"type": "string", "minLength": 8},
"source": {
"type": "object",
"required": ["agent"],
"properties": {
"agent": {"type": "string"},
"crew": {"type": "string"}
}
},
"ttl_seconds": {"type": "integer", "minimum": 1, "maximum": 86400},
"payload": {"type": "object"}
}
}
def validar_mensaje(mensaje: dict) -> bool:
"""Valida un mensaje contra el esquema."""
try:
jsonschema.validate(instance=mensaje, schema=MESSAGE_SCHEMA)
return True
except jsonschema.ValidationError as e:
print(f"Error de validación: {e}")
return False
Estas prácticas de comunicación y resiliencia son las que separan un sistema multi-agente amateur de uno profesional. Si todo esto te parece mucha complejidad para gestionar tú mismo, plataformas como MakeYourCrew lo incluyen de serie: colas de mensajes, dead letter queues, heartbeats y validación de esquemas vienen integradas.
¿Listo para construir tu sistema multi-agente sin preocuparte por la comunicación?
MakeYourCrew incluye sistema de mensajería entre agentes, colas gestionadas, dead letter queues y monitorización en tiempo real. Para que te centres en la lógica de tus agentes, no en la infraestructura de comunicación.
Unirme a la waitlistPlan gratuito disponible · 3 agentes incluidos · Sin tarjeta de crédito
Preguntas frecuentes
¿Cómo se comunican los agentes de IA entre sí?
Mediante paso de mensajes estructurados, normalmente en JSON. Los mensajes incluyen información sobre el remitente, destinatario, tipo de mensaje, payload y metadatos de control. La transmisión puede ser directa (HTTP/GRPC) o a través de colas de mensajes (Redis, RabbitMQ, Kafka).
¿Qué formato de mensaje es mejor para la comunicación entre agentes?
JSON es el formato más recomendado para la mayoría de los casos por su legibilidad, flexibilidad y compatibilidad. Para sistemas de alto rendimiento, Protocol Buffers ofrece mejor eficiencia. Para comunicación con LLMs, el markdown estructurado es una buena alternativa.
¿Qué es una cola de mensajes y por qué la necesito?
Una cola de mensajes es un sistema de almacenamiento temporal que desacopla el envío y recepción de mensajes. La necesitas cuando tienes múltiples agentes que se comunican de forma asíncrona, para evitar que un agente lento bloquee a los demás y para garantizar que ningún mensaje se pierda.
¿Cómo manejar la comunicación cuando un agente falla?
Implementa dead letter queues para mensajes no procesables, reintentos con backoff exponencial, timeouts configurables y un sistema de heartbeats para detectar agentes caídos. La clave es diseñar para el fallo desde el principio.
¿Deben los agentes comunicarse directamente o a través de un orquestador?
Para sistemas pequeños (2-5 agentes), la comunicación directa es aceptable. Para sistemas medianos y grandes, usar un orquestador o colas de mensajes centralizadas facilita la monitorización, el debugging y el escalado. MakeYourCrew usa un bus de mensajes gestionado que escala automáticamente.
📚 Sigue leyendo: Orquestar múltiples agentes de IA · Desplegar tu primer agente IA · Crear equipo de agentes IA · Guía completa de sistemas multi-agente