Mejores Prácticas para Comunicación entre Agentes de IA

Técnicas, protocolos y formatos para que tus agentes de IA se comuniquen de forma eficiente. Con ejemplos de código JSON, Python y patrones probados en producción.

Índice de contenidos
📅 21 junio 2026 · 📖 ~2400 palabras · 🎯 Cluster C: Tutoriales para Developers

En un sistema multi-agente, la comunicación no es un detalle menor: es el sistema circulatorio. Si los agentes no se comunican bien, el sistema entero falla. Mensajes perdidos, formatos incompatibles, timeouts mal configurados — cada uno de estos problemas puede tumbar un sistema en producción.

Esta guía recoge las mejores prácticas que hemos aprendido diseñando sistemas multi-agente en producción. Cubrimos desde el formato básico de un mensaje hasta patrones avanzados de comunicación asíncrona, con ejemplos de código que puedes implementar hoy.

🎯 Prerrequisito: Esta guía asume que ya conoces los conceptos básicos de sistemas multi-agente. Si vienes de cero, te recomendamos leer primero la guía completa de sistemas multi-agente y el artículo sobre cómo orquestar múltiples agentes.

Fundamentos de la comunicación entre agentes

Antes de hablar de formatos y protocolos, necesitamos entender los principios fundamentales que toda comunicación entre agentes debería seguir:

  1. Mensajes autónomos: Cada mensaje debe ser autocontenido. Quien lo recibe debe poder procesarlo sin necesidad de contexto externo.
  2. Identificación única: Cada mensaje necesita un ID único para tracking, debugging y deduplicación.
  3. Contrato explícito: Los agentes deben acordar un esquema de mensaje. Sin contrato, hay caos.
  4. Asincronía por defecto: Los agentes no deberían bloquearse esperando respuestas. La comunicación asíncrona es más resiliente.
  5. Manejo de errores: Los mensajes fallidos no se pierden. Van a una cola de revisión o se reintentan.

Estos principios aplican independientemente del formato o protocolo que uses. Violar cualquiera de ellos te traerá problemas a medida que el sistema crezca.

"La comunicación entre agentes es como un contrato entre servicios. Si no está bien definido, alguien va a incumplirlo."

Formatos de mensaje: JSON, Protocol Buffers y más

El formato del mensaje determina cómo se serializa, transmite y deserializa la información entre agentes. Cada formato tiene sus ventajas y desventajas:

Formato Tamaño Velocidad Legibilidad Esquema Ecosistema
JSON Grande Media ⭐⭐⭐ Excelente Opcional (JSON Schema) Universal
Protocol Buffers Pequeño Alta ⭐⭐ Baja (binario) Obligatorio (.proto) Multi-lenguaje
MessagePack Pequeño Alta ⭐⭐ Media Opcional Multi-lenguaje
YAML Grande Baja ⭐⭐⭐ Excelente Opcional Configuración
Markdown estructurado Grande Baja ⭐⭐⭐ Excelente No formal LLMs

Nuestra recomendación

Para el 90% de los casos, JSON es la opción correcta. Es legible, universal, fácil de debuggear y todas las herramientas lo soportan. Solo considera Protocol Buffers si necesitas rendimiento extremo o estás transmitiendo volúmenes masivos de datos.

Para mensajes que serán procesados por LLMs (no por código), el markdown estructurado puede ser una mejor opción, ya que los modelos lo entienden de forma natural. Pero para comunicación entre agentes y orquestadores, JSON es el estándar de facto.

Esquema de mensaje recomendado

Después de construir varios sistemas multi-agente, hemos convergido en este esquema de mensaje que cubre el 95% de los casos de uso:

// Ejemplo completo de mensaje entre agentes
{
  "protocol_version": "1.0",
  "message_id": "msg_2kf8d93jf4",
  "correlation_id": "corr_7h3kf92",
  // Quién envía y quién recibe
  "source": {
    "agent": "investigador_mercado",
    "crew": "crew_analisis_mercado",
    "instance_id": "inst_abc123"
  },
  "target": {
    "agent": "analista_datos",
    "crew": "crew_analisis_mercado"
  },
  // Metadatos de trazabilidad
  "timestamp": "2026-06-21T15:30:00.123Z",
  "ttl_seconds": 300,
  "priority": 3,
  // Tipo de mensaje y payload
  "message_type": "task_request",
  // task_request | task_result | task_error | heartbeat | log | control
  "payload": {
    "task_id": "task_789",
    "type": "analyze_market",
    "input": {
      "market": "LATAM",
      "sector": "fintech",
      "year": 2026
    },
    "context": {
      "session_id": "session_456",
      "previous_results": ["result_task_788"]
    }
  },
  // Estado y metadatos de ejecución
  "status": {
    "code": "pending",
    // pending | processing | completed | failed | cancelled
    "attempt": 1,
    "max_attempts": 3
  },
  // Firma para verificar integridad
  "signature": "sig_a1b2c3d4..."
}

Este esquema incluye todo lo necesario para un sistema robusto:

Patrones de comunicación

Más allá del formato del mensaje, necesitas decidir cómo se comunican los agentes. Aquí tienes los patrones más utilizados:

1. Request-Response (Síncrono)

En qué consiste: Un agente envía una solicitud y espera la respuesta.

Cuándo usarlo: Cuando necesitas una respuesta inmediata para continuar. Ideal para pipelines lineales.

Ejemplo: Agente A pide un análisis al Agente B, espera el resultado, y lo usa como entrada para su siguiente paso.

Implementación: Llamadas HTTP directas, gRPC, o funciones asíncronas con await.

2. Publish-Subscribe (Asíncrono)

En qué consiste: Un agente publica un mensaje y múltiples agentes suscritos lo reciben.

Cuándo usarlo: Cuando múltiples agentes necesitan reaccionar al mismo evento.

Ejemplo: Un agente detector de anomalías publica un evento "anomalía detectada" y tres agentes suscritos (alertas, logging, backup) reaccionan en paralelo.

Implementación: Redis Pub/Sub, RabbitMQ exchanges, Kafka topics.

3. Message Queue (Cola de trabajo)

En qué consiste: Los mensajes se encolan y los agentes los procesan cuando están disponibles.

Cuándo usarlo: Cuando hay desequilibrio entre la velocidad de producción y consumo de mensajes, o para distribuir carga entre agentes idénticos.

Ejemplo: 100 tareas de análisis encoladas, 3 agentes analistas las toman de la cola según su disponibilidad.

Implementación: Redis Lists, RabbitMQ queues, Amazon SQS.

4. Blackboard (Memoria compartida)

En qué consiste: Los agentes leen y escriben en un espacio de memoria compartido.

Cuándo usarlo: Cuando múltiples agentes necesitan acceder y modificar el mismo estado de forma concurrente.

Ejemplo: Un equipo de agentes trabajando en un documento colaborativo, cada uno aporta su sección.

Implementación: Redis (con locks optimistas), base de datos compartida, sistema de archivos.

Patrón Acoplamiento Escalabilidad Complejidad Latencia
Request-Response Alto Baja Baja Baja
Pub-Sub Bajo Alta Media Media
Message Queue Bajo Muy alta Media Media-Alta
Blackboard Medio Media Alta Variable

Colas de mensajes: Redis vs RabbitMQ vs Kafka

Para sistemas multi-agente que escalan, las colas de mensajes son prácticamente obligatorias. Aquí tienes las opciones más populares y cuándo usar cada una:

Característica Redis RabbitMQ Apache Kafka
Tipo In-memory + persistencia opcional Message broker clásico Event streaming platform
Rendimiento ~100K msg/s ~50K msg/s ~1M+ msg/s
Persistencia ⚠️ Limitada ✅ Alta ✅ Muy alta
Complejidad setup 🟢 Baja 🟡 Media 🔴 Alta
Casos de uso Caching, colas simples, pub-sub ligero Colas de trabajo, routing complejo Event sourcing, streaming, logs
Ideal para agentes 2-10 agentes, prototipos 5-30 agentes, producción 20+ agentes, alta escala

Recomendación para startups

Para la mayoría de startups building sistemas multi-agente, Redis es el punto de partida ideal. Es simple de configurar, rápido y suficiente para sistemas de hasta 10-15 agentes. Cuando superes esa escala, migra a RabbitMQ o directamente a Kafka si necesitas event sourcing.

Si usas MakeYourCrew, no necesitas preocuparte por esto: la plataforma incluye colas de mensajes gestionadas que escalan automáticamente con tu sistema.

Implementación práctica en Python

Vamos a implementar un sistema de comunicación entre agentes usando Redis como cola de mensajes. Este es un patrón que funciona bien para equipos de 3-10 agentes.

import json
import uuid
from datetime import datetime, timezone
from typing import Optional
import redis.asyncio as redis


class MessageBus:
    """Sistema de comunicación entre agentes usando Redis."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.prefix = "agents:"

    async def send(
        self,
        recipient: str,
        sender: str,
        message_type: str,
        payload: dict,
        ttl: int = 300
    ) -> str:
        """Envía un mensaje a un agente específico."""
        message_id = f"msg_{uuid.uuid4().hex[:12]}"
        message = {
            "protocol_version": "1.0",
            "message_id": message_id,
            "source": {"agent": sender},
            "target": {"agent": recipient},
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "ttl_seconds": ttl,
            "message_type": message_type,
            "payload": payload,
            "status": {"code": "pending", "attempt": 1, "max_attempts": 3}
        }

        queue_key = f"{self.prefix}queue:{recipient}"
        message_key = f"{self.prefix}msg:{message_id}"

        # Almacenar mensaje con TTL
        await self.redis.setex(message_key, ttl, json.dumps(message))
        # Encolar referencia al mensaje
        await self.redis.lpush(queue_key, message_id)

        return message_id

    async def receive(self, agent_name: str, timeout: int = 5) -> Optional[dict]:
        """Recibe el siguiente mensaje para un agente (bloqueante)."""
        queue_key = f"{self.prefix}queue:{agent_name}"

        # Esperar por un mensaje con timeout
        result = await self.redis.brpop(queue_key, timeout=timeout)
        if result is None:
            return None

        _, message_id = result
        message_key = f"{self.prefix}msg:{message_id}"
        message_data = await self.redis.get(message_key)

        if message_data is None:
            return None  # Mensaje expiró

        message = json.loads(message_data)
        message["status"]["code"] = "processing"
        await self.redis.setex(message_key, message["ttl_seconds"],
                              json.dumps(message))

        return message

    async def respond(
        self,
        original_message: dict,
        payload: dict,
        status: str = "completed"
    ) -> str:
        """Responde al remitente de un mensaje."""
        return await self.send(
            recipient=original_message["source"]["agent"],
            sender=original_message["target"]["agent"],
            message_type="task_result" if status == "completed" else "task_error",
            payload={
                "task_id": original_message["payload"].get("task_id"),
                "status": status,
                "data": payload
            },
            ttl=original_message.get("ttl_seconds", 300)
        )

    async def publish(self, topic: str, sender: str, payload: dict):
        """Publica un mensaje en un canal pub-sub."""
        channel = f"{self.prefix}pubsub:{topic}"
        message = {
            "source": {"agent": sender},
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "payload": payload
        }
        await self.redis.publish(channel, json.dumps(message))

    async def subscribe(self, topic: str, callback):
        """Se suscribe a un canal pub-sub."""
        channel = f"{self.prefix}pubsub:{topic}"
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(channel)

        async for message in pubsub.listen():
            if message["type"] == "message":
                await callback(json.loads(message["data"]))


# --- Ejemplo de uso ---
async def ejemplo():
    bus = MessageBus()

    # Agente A envía una tarea al Agente B
    msg_id = await bus.send(
        recipient="agente_analista",
        sender="agente_coordinador",
        message_type="task_request",
        payload={
            "task_id": "task_001",
            "type": "analyze_data",
            "input": {"dataset": "ventas_2026", "metricas": ["total", "avg"]}
        }
    )
    print(f"Mensaje enviado: {msg_id}")

    # Agente B recibe y procesa
    msg = await bus.receive("agente_analista")
    if msg:
        print(f"Procesando: {msg['payload']['task_id']}")
        # ... procesar ...
        await bus.respond(msg, {"resultado": "análisis completado"})

    await bus.redis.close()
💡 Para producción: Este ejemplo didáctico usa Redis. En producción, añade validación de esquemas (JSON Schema), manejo de dead letter queues, monitorización de latencia de mensajes y logging estructurado. MakeYourCrew incluye todo esto por defecto.

Gestión de errores y resiliencia

En comunicación entre agentes, los errores son inevitables. Lo importante es tener un plan para cuando ocurran. Aquí están las estrategias que usamos en producción:

1. Dead Letter Queue (DLQ)

Los mensajes que no se pueden procesar después de N reintentos van a una cola especial para revisión. Nunca se pierden, quedan disponibles para diagnóstico.

async def procesar_con_dlq(bus: MessageBus, agente, max_attempts: int = 3):
    """Procesa mensajes con dead letter queue."""
    while True:
        msg = await bus.receive(agente.nombre)
        if msg is None:
            continue

        try:
            resultado = await agente.procesar(msg["payload"])
            await bus.respond(msg, resultado)
            print(f"[{agente.nombre}] Tarea {msg['payload']['task_id']} completada")

        except Exception as e:
            attempt = msg["status"]["attempt"]
            if attempt < max_attempts:
                # Reintentar
                msg["status"]["attempt"] = attempt + 1
                print(f"[{agente.nombre}] Reintento {attempt + 1}/{max_attempts}")
                await bus.redis.lpush(f"agents:queue:{agente.nombre}",
                                     msg["message_id"])
            else:
                # Enviar a DLQ
                dlq_key = f"agents:dlq:{agente.nombre}"
                msg["status"]["code"] = "failed"
                msg["status"]["error"] = str(e)
                await bus.redis.lpush(dlq_key, json.dumps(msg))
                print(f"[{agente.nombre}] Enviado a DLQ: {e}")

2. Timeouts en todas las comunicaciones

Cada mensaje debe tener un TTL. Si un agente no lo procesa a tiempo, se descarta o se reasigna. Esto evita que un agente congelado bloquee todo el sistema.

3. Health checks y heartbeats

Cada agente debe emitir heartbeats periódicos. Si un agente no envía heartbeat durante N segundos, el orquestador lo marca como caído y redistribuye sus tareas.

async def monitor_health(bus: MessageBus, agentes: list, timeout: int = 30):
    """Monitoriza la salud de los agentes mediante heartbeats."""
    while True:
        for agente in agentes:
            last_heartbeat = await bus.redis.get(
                f"agents:heartbeat:{agente.nombre}"
            )
            if last_heartbeat is None:
                print(f"[MONITOR] {agente.nombre} no responde!")
                # Reasignar tareas pendientes
                await reasignar_tareas(bus, agente.nombre)
        await asyncio.sleep(10)

4. Validación de esquemas

Usa JSON Schema para validar que los mensajes cumplen el contrato antes de procesarlos. Esto detecta errores de formato temprano y evita fallos crípticos más adelante.

MESSAGE_SCHEMA = {
    "type": "object",
    "required": ["protocol_version", "message_id", "source", "payload"],
    "properties": {
        "protocol_version": {"type": "string", "pattern": "^\\d+\\.\\d+$"},
        "message_id": {"type": "string", "minLength": 8},
        "source": {
            "type": "object",
            "required": ["agent"],
            "properties": {
                "agent": {"type": "string"},
                "crew": {"type": "string"}
            }
        },
        "ttl_seconds": {"type": "integer", "minimum": 1, "maximum": 86400},
        "payload": {"type": "object"}
    }
}

def validar_mensaje(mensaje: dict) -> bool:
    """Valida un mensaje contra el esquema."""
    try:
        jsonschema.validate(instance=mensaje, schema=MESSAGE_SCHEMA)
        return True
    except jsonschema.ValidationError as e:
        print(f"Error de validación: {e}")
        return False

Estas prácticas de comunicación y resiliencia son las que separan un sistema multi-agente amateur de uno profesional. Si todo esto te parece mucha complejidad para gestionar tú mismo, plataformas como MakeYourCrew lo incluyen de serie: colas de mensajes, dead letter queues, heartbeats y validación de esquemas vienen integradas.

¿Listo para construir tu sistema multi-agente sin preocuparte por la comunicación?

MakeYourCrew incluye sistema de mensajería entre agentes, colas gestionadas, dead letter queues y monitorización en tiempo real. Para que te centres en la lógica de tus agentes, no en la infraestructura de comunicación.

Unirme a la waitlist

Plan gratuito disponible · 3 agentes incluidos · Sin tarjeta de crédito

Preguntas frecuentes

¿Cómo se comunican los agentes de IA entre sí?

Mediante paso de mensajes estructurados, normalmente en JSON. Los mensajes incluyen información sobre el remitente, destinatario, tipo de mensaje, payload y metadatos de control. La transmisión puede ser directa (HTTP/GRPC) o a través de colas de mensajes (Redis, RabbitMQ, Kafka).

¿Qué formato de mensaje es mejor para la comunicación entre agentes?

JSON es el formato más recomendado para la mayoría de los casos por su legibilidad, flexibilidad y compatibilidad. Para sistemas de alto rendimiento, Protocol Buffers ofrece mejor eficiencia. Para comunicación con LLMs, el markdown estructurado es una buena alternativa.

¿Qué es una cola de mensajes y por qué la necesito?

Una cola de mensajes es un sistema de almacenamiento temporal que desacopla el envío y recepción de mensajes. La necesitas cuando tienes múltiples agentes que se comunican de forma asíncrona, para evitar que un agente lento bloquee a los demás y para garantizar que ningún mensaje se pierda.

¿Cómo manejar la comunicación cuando un agente falla?

Implementa dead letter queues para mensajes no procesables, reintentos con backoff exponencial, timeouts configurables y un sistema de heartbeats para detectar agentes caídos. La clave es diseñar para el fallo desde el principio.

¿Deben los agentes comunicarse directamente o a través de un orquestador?

Para sistemas pequeños (2-5 agentes), la comunicación directa es aceptable. Para sistemas medianos y grandes, usar un orquestador o colas de mensajes centralizadas facilita la monitorización, el debugging y el escalado. MakeYourCrew usa un bus de mensajes gestionado que escala automáticamente.

📚 Sigue leyendo: Orquestar múltiples agentes de IA · Desplegar tu primer agente IA · Crear equipo de agentes IA · Guía completa de sistemas multi-agente

M

MakeYourCrew Team

Somos el equipo de MakeYourCrew, la plataforma todo-en-uno para desplegar, orquestar y monitorizar equipos de agentes de IA. Escribimos sobre sistemas multi-agente, arquitectura de agentes y mejores prácticas para developers que quieren construir con IA.