HomeArtificial IntelligenceConstructing a Multi-Agent AI Analysis Workforce with LangGraph and Gemini for Automated...

Constructing a Multi-Agent AI Analysis Workforce with LangGraph and Gemini for Automated Reporting


On this tutorial, we construct an entire multi-agent analysis group system utilizing LangGraph and Google’s Gemini API. We make the most of role-specific brokers, Researcher, Analyst, Author, and Supervisor, every liable for a definite a part of the analysis pipeline. Collectively, these brokers collaboratively collect information, analyze insights, synthesize a report, and coordinate the workflow. We additionally incorporate options like reminiscence persistence, agent coordination, customized brokers, and efficiency monitoring. By the tip of the setup, we will run automated, clever analysis classes that generate structured experiences on any given subject.

!pip set up langgraph langchain-google-genai langchain-community langchain-core python-dotenv


import os
from typing import Annotated, Listing, Tuple, Union
from typing_extensions import TypedDict
import operator
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.reminiscence import MemorySaver
import functools


import getpass
GOOGLE_API_KEY = getpass.getpass("Enter your Google API Key: ")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

We start by putting in the mandatory libraries, together with LangGraph and LangChain’s Google Gemini integration. Then, we import the important modules and arrange our surroundings by securely coming into the Google API key utilizing the getpass module. This ensures we will authenticate our Gemini LLM with out exposing the important thing within the code.

class AgentState(TypedDict):
    """State shared between all brokers within the graph"""
    messages: Annotated[list, operator.add]
    subsequent: str
    current_agent: str
    research_topic: str
    findings: dict
    final_report: str


class AgentResponse(TypedDict):
    """Normal response format for all brokers"""
    content material: str
    next_agent: str
    findings: dict


def create_llm(temperature: float = 0.1, mannequin: str = "gemini-1.5-flash") -> ChatGoogleGenerativeAI:
    """Create a configured Gemini LLM occasion"""
    return ChatGoogleGenerativeAI(
        mannequin=mannequin,
        temperature=temperature,
        google_api_key=os.environ["GOOGLE_API_KEY"]
    )

We outline two TypedDict courses to keep up structured state and responses shared throughout all brokers within the LangGraph. AgentState tracks messages, workflow standing, subject, and picked up findings, whereas AgentResponse standardizes every agent’s output. We additionally create a helper perform to start out the Gemini LLM with a specified mannequin and temperature, making certain constant habits throughout all brokers.

def create_research_agent(llm: ChatGoogleGenerativeAI) -> callable:
    """Creates a analysis specialist agent for preliminary information gathering"""
   
    research_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Research Specialist AI. Your role is to:
        1. Analyze the research topic thoroughly
        2. Identify key areas that need investigation
        3. Provide initial research findings and insights
        4. Suggest specific angles for deeper analysis
       
        Focus on providing comprehensive, accurate information and clear research directions.
        Always structure your response with clear sections and bullet points.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Research Topic: {research_topic}")
    ])
   
    research_chain = research_prompt | llm
   
    def research_agent(state: AgentState) -> AgentState:
        """Execute analysis evaluation"""
        attempt:
            response = research_chain.invoke({
                "messages": state["messages"],
                "research_topic": state["research_topic"]
            })
           
            findings = {
                "research_overview": response.content material,
                "key_areas": ["area1", "area2", "area3"],
                "initial_insights": response.content material[:500] + "..."
            }
           
            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "subsequent": "analyst",
                "current_agent": "researcher",
                "research_topic": state["research_topic"],
                "findings": {**state.get("findings", {}), "analysis": findings},
                "final_report": state.get("final_report", "")
            }
           
        besides Exception as e:
            error_msg = f"Analysis agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "subsequent": "analyst",
                "current_agent": "researcher",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }
   
    return research_agent

We now create our first specialised agent, the Analysis Specialist AI. This agent is prompted to deeply analyze a given subject, extract key areas of curiosity, and recommend instructions for additional exploration. Utilizing a ChatPromptTemplate, we outline its habits and join it with our Gemini LLM. The research_agent perform executes this logic, updates the shared state with findings and messages, and passes management to the following agent in line, the Analyst.

def create_analyst_agent(llm: ChatGoogleGenerativeAI) -> callable:
    """Creates an information analyst agent for deep evaluation"""
   
    analyst_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Data Analyst AI. Your role is to:
        1. Analyze data and information provided by the research team
        2. Identify patterns, trends, and correlations
        3. Provide statistical insights and data-driven conclusions
        4. Suggest actionable recommendations based on analysis
       
        Focus on quantitative analysis, data interpretation, and evidence-based insights.
        Use clear metrics and concrete examples in your analysis.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Analyze the research findings for: {research_topic}")
    ])
   
    analyst_chain = analyst_prompt | llm
   
    def analyst_agent(state: AgentState) -> AgentState:
        """Execute information evaluation"""
        attempt:
            response = analyst_chain.invoke({
                "messages": state["messages"],
                "research_topic": state["research_topic"]
            })
           
            analysis_findings = {
                "analysis_summary": response.content material,
                "key_metrics": ["metric1", "metric2", "metric3"],
                "suggestions": response.content material.break up("suggestions:")[-1] if "suggestions:" in response.content material.decrease() else "No particular suggestions discovered"
            }
           
            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "subsequent": "author",
                "current_agent": "analyst",
                "research_topic": state["research_topic"],
                "findings": {**state.get("findings", {}), "evaluation": analysis_findings},
                "final_report": state.get("final_report", "")
            }
           
        besides Exception as e:
            error_msg = f"Analyst agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "subsequent": "author",
                "current_agent": "analyst",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }
   
    return analyst_agent

We now outline the Knowledge Analyst AI, which dives deeper into the analysis findings generated by the earlier agent. This agent identifies key patterns, tendencies, and metrics, providing actionable insights backed by proof. Utilizing a tailor-made system immediate and the Gemini LLM, the analyst_agent perform enriches the state with structured evaluation, making ready the groundwork for the report author to synthesize every thing right into a remaining doc.

def create_writer_agent(llm: ChatGoogleGenerativeAI) -> callable:
    """Creates a report author agent for remaining documentation"""
   
    writer_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Report Writer AI. Your role is to:
        1. Synthesize all research and analysis into a comprehensive report
        2. Create clear, professional documentation
        3. Ensure proper structure with executive summary, findings, and conclusions
        4. Make complex information accessible to various audiences
       
        Focus on clarity, completeness, and professional presentation.
        Include specific examples and actionable insights.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Create a comprehensive report for: {research_topic}")
    ])
   
    writer_chain = writer_prompt | llm
   
    def writer_agent(state: AgentState) -> AgentState:
        """Execute report writing"""
        attempt:
            response = writer_chain.invoke({
                "messages": state["messages"],
                "research_topic": state["research_topic"]
            })
           
            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "subsequent": "supervisor",
                "current_agent": "author",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": response.content material
            }
           
        besides Exception as e:
            error_msg = f"Author agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "subsequent": "supervisor",
                "current_agent": "author",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": f"Error producing report: {str(e)}"
            }
   
    return writer_agent

We now create the Report Author AI, which is liable for reworking the collected analysis and evaluation into a cultured, structured doc. This agent synthesizes all earlier insights into a transparent, skilled report with an government abstract, detailed findings, and conclusions. By invoking the Gemini mannequin with a structured immediate, the author agent updates the ultimate report within the shared state and palms management over to the Supervisor agent for overview.

def create_supervisor_agent(llm: ChatGoogleGenerativeAI, members: Listing[str]) -> callable:
    """Creates a supervisor agent to coordinate the group"""
   
    choices = ["FINISH"] + members
   
    supervisor_prompt = ChatPromptTemplate.from_messages([
        ("system", f"""You are a Supervisor AI managing a research team. Your team members are:
        {', '.join(members)}
       
        Your responsibilities:
        1. Coordinate the workflow between team members
        2. Ensure each agent completes their specialized tasks
        3. Determine when the research is complete
        4. Maintain quality standards throughout the process
       
        Given the conversation, determine the next step:
        - If research is needed: route to "researcher"
        - If analysis is needed: route to "analyst"  
        - If report writing is needed: route to "writer"
        - If work is complete: route to "FINISH"
       
        Available options: {options}
       
        Respond with just the name of the next agent or "FINISH".
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Current status: {current_agent} just completed their task for topic: {research_topic}")
    ])
   
    supervisor_chain = supervisor_prompt | llm
   
    def supervisor_agent(state: AgentState) -> AgentState:
        """Execute supervisor coordination"""
        attempt:
            response = supervisor_chain.invoke({
                "messages": state["messages"],
                "current_agent": state.get("current_agent", "none"),
                "research_topic": state["research_topic"]
            })
           
            next_agent = response.content material.strip().decrease()
           
            if "end" in next_agent or "full" in next_agent:
                next_step = "FINISH"
            elif "analysis" in next_agent:
                next_step = "researcher"
            elif "analy" in next_agent:
                next_step = "analyst"
            elif "writ" in next_agent:
                next_step = "author"
            else:
                present = state.get("current_agent", "")
                if present == "researcher":
                    next_step = "analyst"
                elif present == "analyst":
                    next_step = "author"
                elif present == "author":
                    next_step = "FINISH"
                else:
                    next_step = "researcher"
           
            return {
                "messages": state["messages"] + [AIMessage(content=f"Supervisor decision: Next agent is {next_step}")],
                "subsequent": next_step,
                "current_agent": "supervisor",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }
           
        besides Exception as e:
            error_msg = f"Supervisor error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "subsequent": "FINISH",
                "current_agent": "supervisor",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }
   
    return supervisor_agent

We now carry within the Supervisor AI, which oversees and orchestrates the complete multi-agent workflow. This agent evaluates the present progress, realizing which group member simply completed their process, and intelligently decides the following step: whether or not to proceed with analysis, proceed to evaluation, provoke report writing, or mark the challenge as full. By parsing the dialog context and using Gemini for reasoning, the supervisor agent ensures easy transitions and high quality management all through the analysis pipeline.

def create_research_team_graph() -> StateGraph:
    """Creates the whole analysis group workflow graph"""
   
    llm = create_llm()
   
    members = ["researcher", "analyst", "writer"]
    researcher = create_research_agent(llm)
    analyst = create_analyst_agent(llm)
    author = create_writer_agent(llm)
    supervisor = create_supervisor_agent(llm, members)
   
    workflow = StateGraph(AgentState)
   
    workflow.add_node("researcher", researcher)
    workflow.add_node("analyst", analyst)
    workflow.add_node("author", author)
    workflow.add_node("supervisor", supervisor)
   
    workflow.add_edge("researcher", "supervisor")
    workflow.add_edge("analyst", "supervisor")
    workflow.add_edge("author", "supervisor")
   
    workflow.add_conditional_edges(
        "supervisor",
        lambda x: x["next"],
        {
            "researcher": "researcher",
            "analyst": "analyst",
            "author": "author",
            "FINISH": END
        }
    )
   
    workflow.set_entry_point("supervisor")
   
    return workflow


def compile_research_team():
    """Compile the analysis group graph with reminiscence"""
    workflow = create_research_team_graph()
   
    reminiscence = MemorySaver()
   
    app = workflow.compile(checkpointer=reminiscence)
   
    return app


def run_research_team(subject: str, thread_id: str = "research_session_1"):
    """Run the whole analysis group workflow"""
   
    app = compile_research_team()
   
    initial_state = {
        "messages": [HumanMessage(content=f"Research the topic: {topic}")],
        "research_topic": subject,
        "subsequent": "researcher",
        "current_agent": "begin",
        "findings": {},
        "final_report": ""
    }
   
    config = {"configurable": {"thread_id": thread_id}}
   
    print(f"🔍 Beginning analysis on: {subject}")
    print("=" * 50)
   
    attempt:
        final_state = None
        for step, state in enumerate(app.stream(initial_state, config=config)):
            print(f"n📍 Step {step + 1}: {listing(state.keys())[0]}")
           
            current_state = listing(state.values())[0]
            if current_state["messages"]:
                last_message = current_state["messages"][-1]
                if isinstance(last_message, AIMessage):
                    print(f"💬 {last_message.content material[:200]}...")
           
            final_state = current_state
           
            if step > 10:
                print("⚠️  Most steps reached. Stopping execution.")
                break
       
        return final_state
       
    besides Exception as e:
        print(f"❌ Error throughout execution: {str(e)}")
        return None

Take a look at the complete Codes

We now assemble and execute the complete multi-agent workflow utilizing LangGraph. First, we outline the analysis group graph, which consists of nodes for every agent, Researcher, Analyst, Author, and Supervisor, related by logical transitions. Then, we compile this graph with reminiscence utilizing MemorySaver to persist dialog historical past. Lastly, the run_research_team() perform initializes the method with a subject and streams execution step-by-step, permitting us to trace every agent’s contribution in real-time. This orchestration ensures a completely automated, collaborative analysis pipeline.

if __name__ == "__main__":
    consequence = run_research_team("Synthetic Intelligence in Healthcare")
   
    if consequence:
        print("n" + "=" * 50)
        print("📊 FINAL RESULTS")
        print("=" * 50)
        print(f"🏁 Ultimate Agent: {consequence['current_agent']}")
        print(f"📋 Findings: {len(consequence['findings'])} sections")
        print(f"📄 Report Size: {len(consequence['final_report'])} characters")
       
        if consequence['final_report']:
            print("n📄 FINAL REPORT:")
            print("-" * 30)
            print(consequence['final_report'])


def interactive_research_session():
    """Run an interactive analysis session"""
   
    app = compile_research_team()
   
    print("🎯 Interactive Analysis Workforce Session")
    print("Enter 'stop' to exitn")
   
    session_count = 0
   
    whereas True:
        subject = enter("🔍 Enter analysis subject: ").strip()
       
        if subject.decrease() in ['quit', 'exit', 'q']:
            print("👋 Goodbye!")
            break
       
        if not subject:
            print("❌ Please enter a legitimate subject.")
            proceed
       
        session_count += 1
        thread_id = f"interactive_session_{session_count}"
       
        consequence = run_research_team(subject, thread_id)
       
        if consequence and consequence['final_report']:
            print(f"n✅ Analysis accomplished for: {subject}")
            print(f"📄 Report preview: {consequence['final_report'][:300]}...")
           
            show_full = enter("n📖 Present full report? (y/n): ").decrease()
            if show_full.startswith('y'):
                print("n" + "=" * 60)
                print("📄 COMPLETE RESEARCH REPORT")
                print("=" * 60)
                print(consequence['final_report'])
       
        print("n" + "-" * 50)




def create_custom_agent(function: str, directions: str, llm: ChatGoogleGenerativeAI) -> callable:
    """Create a customized agent with particular function and directions"""
   
    custom_prompt = ChatPromptTemplate.from_messages([
        ("system", f"""You are a {role} AI.
       
        Your specific instructions:
        {instructions}
       
        Always provide detailed, professional responses relevant to your role.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Task: {task}")
    ])
   
    custom_chain = custom_prompt | llm
   
    def custom_agent(state: AgentState) -> AgentState:
        """Execute customized agent process"""
        attempt:
            response = custom_chain.invoke({
                "messages": state["messages"],
                "process": state["research_topic"]
            })
           
            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "subsequent": "supervisor",
                "current_agent": function.decrease().change(" ", "_"),
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }
           
        besides Exception as e:
            error_msg = f"{function} agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "subsequent": "supervisor",
                "current_agent": function.decrease().change(" ", "_"),
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }
   
    return custom_agent

Take a look at the complete Codes

We wrap up our system with runtime and customization capabilities. The principle block permits us to set off a analysis run instantly, making it good for testing the pipeline with a real-world subject, equivalent to Synthetic Intelligence in Healthcare. For extra dynamic use, the interactive_research_session() allows a number of subject queries in a loop, simulating real-time exploration. Lastly, the create_custom_agent() perform permits us to combine new brokers with distinctive roles and directions, making the framework versatile and extensible for specialised workflows.

def visualize_graph():
    """Visualize the analysis group graph construction"""
   
    attempt:
        app = compile_research_team()
       
        graph_repr = app.get_graph()
       
        print("🗺️  Analysis Workforce Graph Construction")
        print("=" * 40)
        print(f"Nodes: {listing(graph_repr.nodes.keys())}")
        print(f"Edges: {[(edge.source, edge.target) for edge in graph_repr.edges]}")
       
        attempt:
            graph_repr.draw_mermaid()
        besides:
            print("📊 Visible graph requires mermaid-py package deal")
            print("Set up with: !pip set up mermaid-py")
           
    besides Exception as e:
        print(f"❌ Error visualizing graph: {str(e)}")




import time
from datetime import datetime


def monitor_research_performance(subject: str):
    """Monitor and report efficiency metrics"""
   
    start_time = time.time()
    print(f"⏱️  Beginning efficiency monitoring for: {subject}")
   
    consequence = run_research_team(subject, f"perf_test_{int(time.time())}")
   
    end_time = time.time()
    period = end_time - start_time
   
    metrics = {
        "period": period,
        "total_messages": len(consequence["messages"]) if consequence else 0,
        "findings_sections": len(consequence["findings"]) if consequence else 0,
        "report_length": len(consequence["final_report"]) if consequence and consequence["final_report"] else 0,
        "success": consequence just isn't None
    }
   
    print("n📊 PERFORMANCE METRICS")
    print("=" * 30)
    print(f"⏱️  Period: {period:.2f} seconds")
    print(f"💬 Whole Messages: {metrics['total_messages']}")
    print(f"📋 Findings Sections: {metrics['findings_sections']}")
    print(f"📄 Report Size: {metrics['report_length']} chars")
    print(f"✅ Success: {metrics['success']}")
   
    return metrics




def quick_start_demo():
    """Full demo of the analysis group system"""
   
    print("🚀 LangGraph Analysis Workforce - Fast Begin Demo")
    print("=" * 50)
   
    subjects = [
        "Climate Change Impact on Agriculture",
        "Quantum Computing Applications",
        "Digital Privacy in the Modern Age"
    ]
   
    for i, subject in enumerate(subjects, 1):
        print(f"n🔍 Demo {i}: {subject}")
        print("-" * 40)
       
        attempt:
            consequence = run_research_team(subject, f"demo_{i}")
           
            if consequence and consequence['final_report']:
                print(f"✅ Analysis accomplished efficiently!")
                print(f"📊 Report preview: {consequence['final_report'][:150]}...")
            else:
                print("❌ Analysis failed")
               
        besides Exception as e:
            print(f"❌ Error in demo {i}: {str(e)}")
       
        print("n" + "="*30)
   
    print("🎉 Demo accomplished!")


quick_start_demo()

We finalize the system by including highly effective utilities for graph visualization, efficiency monitoring, and a fast begin demo. The visualize_graph() perform gives a structural overview of agent connections, supreme for debugging or presentation functions. The monitor_research_performance() tracks runtime, message quantity, and report dimension, serving to us consider the system’s effectivity. Lastly, quick_start_demo() runs a number of pattern analysis subjects in sequence, displaying how seamlessly the brokers collaborate to generate insightful experiences.

In conclusion, we’ve efficiently constructed and examined a completely purposeful, modular AI analysis assistant framework utilizing LangGraph. With clear agent roles and automatic process routing, we streamline analysis from uncooked subject enter to a well-structured remaining report. Whether or not we use the short begin demo, run interactive classes, or monitor efficiency, this technique empowers us to deal with complicated analysis duties with minimal intervention. We’re now geared up to adapt or lengthen this setup additional by integrating customized brokers, visualizing workflows, and even deploying it into real-world functions.


Take a look at the complete Codes | Sponsorship Alternative: Wish to attain probably the most influential AI builders throughout the US and Europe? Be a part of our ecosystem of 1M+ month-to-month readers and 500K+ engaged neighborhood members. [Explore Sponsorship]


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its reputation amongst audiences.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments