HomeArtificial IntelligenceA Coding Information to Constructing a Scalable Multi-Agent Communication Methods Utilizing Agent...

A Coding Information to Constructing a Scalable Multi-Agent Communication Methods Utilizing Agent Communication Protocol (ACP)


On this tutorial, we implement the Agent Communication Protocol (ACP) by way of constructing a versatile, ACP-compliant messaging system in Python, leveraging Google’s Gemini API for pure language processing. Starting with the set up and configuration of the google-generativeai library, the tutorial introduces core abstractions, message varieties, performatives, and the ACPMessage knowledge class, which standardizes inter-agent communication. By defining ACPAgent and ACPMessageBroker courses, the information demonstrates tips on how to create, ship, route, and course of structured messages amongst a number of autonomous brokers. Via clear code examples, customers study to implement querying, requesting actions, and broadcasting data, whereas sustaining dialog threads, acknowledgments, and error dealing with.

import google.generativeai as genai
import json
import time
import uuid
from enum import Enum
from typing import Dict, Record, Any, Non-obligatory
from dataclasses import dataclass, asdict


GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)

We import important Python modules, starting from JSON dealing with and timing to distinctive identifier era and kind annotations, to help a structured ACP implementation. It then retrieves the consumer’s Gemini API key placeholder and configures the google-generativeai shopper for subsequent calls to the Gemini language mannequin.

class ACPMessageType(Enum):
    """Normal ACP message varieties"""
    REQUEST = "request"
    RESPONSE = "response"
    INFORM = "inform"
    QUERY = "question"
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    ERROR = "error"
    ACK = "acknowledge"

The ACPMessageType enumeration defines the core message classes used within the Agent Communication Protocol, together with requests, responses, informational broadcasts, queries, and management actions like subscription administration, error signaling, and acknowledgments. By centralizing these message varieties, the protocol ensures constant dealing with and routing of inter-agent communications all through the system.

class ACPPerformative(Enum):
    """ACP speech acts (performatives)"""
    TELL = "inform"
    ASK = "ask"
    REPLY = "reply"
    REQUEST_ACTION = "request-action"
    AGREE = "agree"
    REFUSE = "refuse"
    PROPOSE = "suggest"
    ACCEPT = "settle for"
    REJECT = "reject"

The ACPPerformative enumeration captures the number of speech acts brokers can use when interacting underneath the ACP framework, mapping high-level intentions, corresponding to making requests, posing questions, giving instructions, or negotiating agreements, onto standardized labels. This clear taxonomy permits brokers to interpret and reply to messages in contextually applicable methods, guaranteeing strong and semantically wealthy communication.

@dataclass
class ACPMessage:
    """Agent Communication Protocol Message Construction"""
    message_id: str
    sender: str
    receiver: str
    performative: str  
    content material: Dict[str, Any]
    protocol: str = "ACP-1.0"
    conversation_id: str = None
    reply_to: str = None
    language: str = "english"
    encoding: str = "json"
    timestamp: float = None
   
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.conversation_id is None:
            self.conversation_id = str(uuid.uuid4())
   
    def to_acp_format(self) -> str:
        """Convert to straightforward ACP message format"""
        acp_msg = {
            "message-id": self.message_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "performative": self.performative,
            "content material": self.content material,
            "protocol": self.protocol,
            "conversation-id": self.conversation_id,
            "reply-to": self.reply_to,
            "language": self.language,
            "encoding": self.encoding,
            "timestamp": self.timestamp
        }
        return json.dumps(acp_msg, indent=2)
   
    @classmethod
    def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
        """Parse ACP message from string format"""
        knowledge = json.hundreds(acp_string)
        return cls(
            message_id=knowledge["message-id"],
            sender=knowledge["sender"],
            receiver=knowledge["receiver"],
            performative=knowledge["performative"],
            content material=knowledge["content"],
            protocol=knowledge.get("protocol", "ACP-1.0"),
            conversation_id=knowledge.get("conversation-id"),
            reply_to=knowledge.get("reply-to"),
            language=knowledge.get("language", "english"),
            encoding=knowledge.get("encoding", "json"),
            timestamp=knowledge.get("timestamp", time.time())
        )

The ACPMessage knowledge class encapsulates all of the fields required for a structured ACP change, together with identifiers, individuals, performative, payload, and metadata corresponding to protocol model, language, and timestamps. Its __post_init__ methodology auto-populates lacking timestamp and conversation_id values, guaranteeing each message is uniquely tracked. Utility strategies to_acp_format and from_acp_format deal with serialization to and from the standardized JSON illustration for seamless transmission and parsing.

class ACPAgent:
    """Agent implementing Agent Communication Protocol"""
   
    def __init__(self, agent_id: str, identify: str, capabilities: Record[str]):
        self.agent_id = agent_id
        self.identify = identify
        self.capabilities = capabilities
        self.mannequin = genai.GenerativeModel("gemini-1.5-flash")
        self.message_queue: Record[ACPMessage] = []
        self.subscriptions: Dict[str, List[str]] = {}  
        self.conversations: Dict[str, List[ACPMessage]] = {}
   
    def create_message(self, receiver: str, performative: str,
                      content material: Dict[str, Any], conversation_id: str = None,
                      reply_to: str = None) -> ACPMessage:
        """Create a brand new ACP-compliant message"""
        return ACPMessage(
            message_id=str(uuid.uuid4()),
            sender=self.agent_id,
            receiver=receiver,
            performative=performative,
            content material=content material,
            conversation_id=conversation_id,
            reply_to=reply_to
        )
   
    def send_inform(self, receiver: str, reality: str, knowledge: Any = None) -> ACPMessage:
        """Ship an INFORM message (telling somebody a reality)"""
        content material = {"reality": reality, "knowledge": knowledge}
        return self.create_message(receiver, ACPPerformative.TELL.worth, content material)
   
    def send_query(self, receiver: str, query: str, query_type: str = "yes-no") -> ACPMessage:
        """Ship a QUERY message (asking for data)"""
        content material = {"query": query, "query-type": query_type}
        return self.create_message(receiver, ACPPerformative.ASK.worth, content material)
   
    def send_request(self, receiver: str, motion: str, parameters: Dict = None) -> ACPMessage:
        """Ship a REQUEST message (asking somebody to carry out an motion)"""
        content material = {"motion": motion, "parameters": parameters or {}}
        return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.worth, content material)
   
    def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
        """Ship a REPLY message in response to a different message"""
        content material = {"response": response_data, "original-question": original_msg.content material}
        return self.create_message(
            original_msg.sender,
            ACPPerformative.REPLY.worth,
            content material,
            conversation_id=original_msg.conversation_id,
            reply_to=original_msg.message_id
        )
   
    def process_message(self, message: ACPMessage) -> Non-obligatory[ACPMessage]:
        """Course of incoming ACP message and generate applicable response"""
        self.message_queue.append(message)
       
        conv_id = message.conversation_id
        if conv_id not in self.conversations:
            self.conversations[conv_id] = []
        self.conversations[conv_id].append(message)
       
        if message.performative == ACPPerformative.ASK.worth:
            return self._handle_query(message)
        elif message.performative == ACPPerformative.REQUEST_ACTION.worth:
            return self._handle_request(message)
        elif message.performative == ACPPerformative.TELL.worth:
            return self._handle_inform(message)
       
        return None
   
    def _handle_query(self, message: ACPMessage) -> ACPMessage:
        """Deal with incoming question messages"""
        query = message.content material.get("query", "")
       
        immediate = f"As agent {self.identify} with capabilities {self.capabilities}, reply: {query}"
        strive:
            response = self.mannequin.generate_content(immediate)
            reply = response.textual content.strip()
        besides:
            reply = "Unable to course of question presently"
       
        return self.send_reply(message, {"reply": reply, "confidence": 0.8})
   
    def _handle_request(self, message: ACPMessage) -> ACPMessage:
        """Deal with incoming motion requests"""
        motion = message.content material.get("motion", "")
        parameters = message.content material.get("parameters", {})
       
        if any(functionality in motion.decrease() for functionality in self.capabilities):
            consequence = f"Executing {motion} with parameters {parameters}"
            standing = "agreed"
        else:
            consequence = f"Can't carry out {motion} - not in my capabilities"
            standing = "refused"
       
        return self.send_reply(message, {"standing": standing, "consequence": consequence})
   
    def _handle_inform(self, message: ACPMessage) -> Non-obligatory[ACPMessage]:
        """Deal with incoming data messages"""
        reality = message.content material.get("reality", "")
        print(f"[{self.name}] Acquired data: {reality}")
       
        ack_content = {"standing": "acquired", "reality": reality}
        return self.create_message(message.sender, "acknowledge", ack_content,
                                 conversation_id=message.conversation_id)

The ACPAgent class encapsulates an autonomous entity able to sending, receiving, and processing ACP-compliant messages utilizing Gemini’s language mannequin. It manages its personal message queue, dialog historical past, and subscriptions, and offers helper strategies (send_inform, send_query, send_request, send_reply) to assemble accurately formatted ACPMessage cases. Incoming messages are routed by way of process_message, which delegates to specialised handlers for queries, motion requests, and informational messages.

class ACPMessageBroker:
    """Message dealer implementing ACP routing and supply"""
   
    def __init__(self):
        self.brokers: Dict[str, ACPAgent] = {}
        self.message_log: Record[ACPMessage] = []
        self.routing_table: Dict[str, str] = {}  
   
    def register_agent(self, agent: ACPAgent):
        """Register an agent with the message dealer"""
        self.brokers[agent.agent_id] = agent
        self.routing_table[agent.agent_id] = "native"
        print(f"✓ Registered agent: {agent.identify} ({agent.agent_id})")
   
    def route_message(self, message: ACPMessage) -> bool:
        """Route ACP message to applicable recipient"""
        if message.receiver not in self.brokers:
            print(f"✗ Receiver {message.receiver} not discovered")
            return False
       
        print(f"n📹 ACP MESSAGE ROUTING:")
        print(f"From: {message.sender} → To: {message.receiver}")
        print(f"Performative: {message.performative}")
        print(f"Content material: {json.dumps(message.content material, indent=2)}")
       
        receiver_agent = self.brokers[message.receiver]
        response = receiver_agent.process_message(message)
       
        self.message_log.append(message)
       
        if response:
            print(f"nđŸ“€ GENERATED RESPONSE:")
            print(f"From: {response.sender} → To: {response.receiver}")
            print(f"Content material: {json.dumps(response.content material, indent=2)}")
           
            if response.receiver in self.brokers:
                self.brokers[response.receiver].process_message(response)
                self.message_log.append(response)
       
        return True
   
    def broadcast_message(self, message: ACPMessage, recipients: Record[str]):
        """Broadcast message to a number of recipients"""
        for recipient in recipients:
            msg_copy = ACPMessage(
                message_id=str(uuid.uuid4()),
                sender=message.sender,
                receiver=recipient,
                performative=message.performative,
                content material=message.content material.copy(),
                conversation_id=message.conversation_id
            )
            self.route_message(msg_copy)

The ACPMessageBroker serves because the central router for ACP messages, sustaining a registry of brokers and a message log. It offers strategies to register brokers, ship particular person messages by way of route_message, which handles lookup, logging, and response chaining, and to ship the identical message to a number of recipients with broadcast_message.

def demonstrate_acp():
    """Complete demonstration of Agent Communication Protocol"""
   
    print("đŸ€– AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")
    print("=" * 60)
   
    dealer = ACPMessageBroker()
   
    researcher = ACPAgent("agent-001", "Dr. Analysis", ["analysis", "research", "data-processing"])
    assistant = ACPAgent("agent-002", "AI Assistant", ["information", "scheduling", "communication"])
    calculator = ACPAgent("agent-003", "MathBot", ["calculation", "mathematics", "computation"])
   
    dealer.register_agent(researcher)
    dealer.register_agent(assistant)
    dealer.register_agent(calculator)
   
    print(f"n📋 REGISTERED AGENTS:")
    for agent_id, agent in dealer.brokers.objects():
        print(f"  ‱ {agent.identify} ({agent_id}): {', '.be part of(agent.capabilities)}")
   
    print(f"n🔬 SCENARIO 1: Data Question (ASK performative)")
    query_msg = assistant.send_query("agent-001", "What are the important thing elements in AI analysis?")
    dealer.route_message(query_msg)
   
    print(f"n🔱 SCENARIO 2: Motion Request (REQUEST-ACTION performative)")
    calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
    dealer.route_message(calc_request)
   
    print(f"n📱 SCENARIO 3: Data Sharing (TELL performative)")
    info_msg = researcher.send_inform("agent-002", "New analysis paper revealed on quantum computing")
    dealer.route_message(info_msg)
   
    print(f"n📊 PROTOCOL STATISTICS:")
    print(f"  ‱ Complete messages processed: {len(dealer.message_log)}")
    print(f"  ‱ Energetic conversations: {len(set(msg.conversation_id for msg in dealer.message_log))}")
    print(f"  ‱ Message varieties used: {len(set(msg.performative for msg in dealer.message_log))}")
   
    print(f"n📋 SAMPLE ACP MESSAGE FORMAT:")
    sample_msg = assistant.send_query("agent-001", "Pattern query for format demonstration")
    print(sample_msg.to_acp_format())

The demonstrate_acp perform orchestrates a hands-on walkthrough of the complete ACP framework: it initializes a dealer and three distinct brokers (Researcher, AI Assistant, and MathBot), registers them, and illustrates three key interplay eventualities, querying for data, requesting a computation, and sharing an replace. After routing every message and dealing with responses, it prints abstract statistics on the message move. It showcases a formatted ACP message, offering customers with a transparent, end-to-end instance of how brokers talk underneath the protocol.

def setup_guide():
    print("""
    🚀 GOOGLE COLAB SETUP GUIDE:
   
    1. Get Gemini API Key: https://makersuite.google.com/app/apikey
    2. Change: GEMINI_API_KEY = "YOUR_ACTUAL_API_KEY"
    3. Run: demonstrate_acp()
   
    🔧 ACP PROTOCOL FEATURES:
   
    ‱ Standardized message format with required fields
    ‱ Speech act performatives (TELL, ASK, REQUEST-ACTION, and so on.)
    ‱ Dialog monitoring and message threading
    ‱ Error dealing with and acknowledgments
    ‱ Message routing and supply affirmation
   
    📝 EXTEND THE PROTOCOL:
    ```python
    # Create customized agent
    my_agent = ACPAgent("my-001", "CustomBot", ["custom-capability"])
    dealer.register_agent(my_agent)
   
    # Ship customized message
    msg = my_agent.send_query("agent-001", "Your query right here")
    dealer.route_message(msg)
    ```
    """)


if __name__ == "__main__":
    setup_guide()
    demonstrate_acp() 

Lastly, the setup_guide perform offers a quick-start reference for working the ACP demo in Google Colab, outlining tips on how to receive and configure your Gemini API key and invoke the demonstrate_acp routine. It additionally summarizes key protocol options, corresponding to standardized message codecs, performatives, and message routing. It offers a concise code snippet illustrating tips on how to register customized brokers and ship tailor-made messages.

In conclusion, this tutorial implements ACP-based multi-agent methods able to analysis, computation, and collaboration duties. The supplied pattern eventualities illustrate frequent use instances, data queries, computational requests, and reality sharing, whereas the dealer ensures dependable message supply and logging. Readers are inspired to increase the framework by including new agent capabilities, integrating domain-specific actions, or incorporating extra subtle subscription and notification mechanisms.


Obtain the Pocket book on GitHub. All credit score for this analysis goes to the researchers of this challenge. Additionally, be at liberty to comply with us on Twitter and don’t neglect to affix our 95k+ ML SubReddit and Subscribe to our E-newsletter.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments