HomeArtificial IntelligenceConstructing a GPU-Accelerated Ollama LangChain Workflow with RAG Brokers, Multi-Session Chat Efficiency...

Constructing a GPU-Accelerated Ollama LangChain Workflow with RAG Brokers, Multi-Session Chat Efficiency Monitoring


On this tutorial, we construct a GPU‑succesful native LLM stack that unifies Ollama and LangChain. We set up the required libraries, launch the Ollama server, pull a mannequin, and wrap it in a customized LangChain LLM, permitting us to manage temperature, token limits, and context. We add a Retrieval-Augmented Technology layer that ingests PDFs or textual content, chunks them, embeds them with Sentence-Transformers, and serves grounded solutions. We handle multi‑session chat reminiscence, register instruments (net search + RAG question), and spin up an agent that causes about when to name them.

import os
import sys
import subprocess
import time
import threading
import queue
import json
from typing import Listing, Dict, Any, Optionally available, Tuple
from dataclasses import dataclass
from contextlib import contextmanager
import asyncio
from concurrent.futures import ThreadPoolExecutor


def install_packages():
    """Set up required packages for Colab surroundings"""
    packages = [
        "langchain",
        "langchain-community",
        "langchain-core",
        "chromadb",
        "sentence-transformers",
        "faiss-cpu",
        "pypdf",
        "python-docx",
        "requests",
        "psutil",
        "pyngrok",
        "gradio"
    ]
   
    for bundle in packages:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])


install_packages()


import requests
import psutil
import threading
from queue import Queue
from langchain.llms.base import LLM
from langchain.callbacks.supervisor import CallbackManagerForLLMRun
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain.reminiscence import ConversationBufferWindowMemory, ConversationSummaryBufferMemory
from langchain.chains import ConversationChain, RetrievalQA
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma
from langchain.brokers import AgentType, initialize_agent, Instrument
from langchain.instruments import DuckDuckGoSearchRun

We import the mandatory Python utilities in Colab for concurrency, system calls, and JSON dealing with. We outline and run install_packages() to tug LangChain, embeddings, vector shops, doc loaders, monitoring, and UI dependencies. We then import LangChain LLM, reminiscence, retrieval, and agent instruments (together with DuckDuckGo search) to construct an extensible RAG and agent workflow.

[Download the full codes with notebook here]

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4

We outline an OllamaConfig dataclass so we maintain all Ollama runtime settings in a single clear place. We set the mannequin title and native API endpoint, in addition to the technology habits (max_tokens, temperature, and context_window). We management efficiency with gpu_layers (‑1 = load all to GPU when doable), batch_size, and threads for parallelism.

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4
We outline an OllamaConfig dataclass so we maintain all Ollama runtime settings in a single clear place. We set the mannequin title and native API endpoint, in addition to the technology habits (max_tokens, temperature, and context_window). We management efficiency with gpu_layers (‑1 = load all to GPU when doable), batch_size, and threads for parallelism.

class OllamaManager:
    """Superior Ollama supervisor for Colab surroundings"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.course of = None
        self.is_running = False
        self.models_cache = {}
        self.performance_monitor = PerformanceMonitor()
       
    def install_ollama(self):
        """Set up Ollama in Colab surroundings"""
        attempt:
            subprocess.run([
                "curl", "-fsSL", "https://ollama.com/install.sh", "-o", "/tmp/install.sh"
            ], test=True)
           
            subprocess.run(["bash", "/tmp/install.sh"], test=True)
            print("✅ Ollama put in efficiently")
           
        besides subprocess.CalledProcessError as e:
            print(f"❌ Failed to put in Ollama: {e}")
            elevate
   
    def start_server(self):
        """Begin Ollama server with GPU assist"""
        if self.is_running:
            print("Ollama server is already working")
            return
           
        attempt:
            env = os.environ.copy()
            env["OLLAMA_NUM_PARALLEL"] = str(self.config.threads)
            env["OLLAMA_MAX_LOADED_MODELS"] = "3"
           
            self.course of = subprocess.Popen(
                ["ollama", "serve"],
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
           
            time.sleep(5)
           
            if self.health_check():
                self.is_running = True
                print("✅ Ollama server began efficiently")
                self.performance_monitor.begin()
            else:
                elevate Exception("Server failed to begin correctly")
               
        besides Exception as e:
            print(f"❌ Failed to begin Ollama server: {e}")
            elevate
   
    def health_check(self) -> bool:
        """Verify if Ollama server is wholesome"""
        attempt:
            response = requests.get(f"{self.config.base_url}/api/tags", timeout=10)
            return response.status_code == 200
        besides:
            return False
   
    def pull_model(self, model_name: str) -> bool:
        """Pull a mannequin from Ollama registry"""
        attempt:
            print(f"🔄 Pulling mannequin: {model_name}")
            end result = subprocess.run(
                ["ollama", "pull", model_name],
                capture_output=True,
                textual content=True,
                timeout=1800  
            )
           
            if end result.returncode == 0:
                print(f"✅ Mannequin {model_name} pulled efficiently")
                self.models_cache[model_name] = True
                return True
            else:
                print(f"❌ Failed to tug mannequin {model_name}: {end result.stderr}")
                return False
               
        besides subprocess.TimeoutExpired:
            print(f"❌ Timeout pulling mannequin {model_name}")
            return False
        besides Exception as e:
            print(f"❌ Error pulling mannequin {model_name}: {e}")
            return False
   
    def list_models(self) -> Listing[str]:
        """Listing out there native fashions"""
        attempt:
            end result = subprocess.run(
                ["ollama", "list"],
                capture_output=True,
                textual content=True
            )
           
            fashions = []
            for line in end result.stdout.break up('n')[1:]:
                if line.strip():
                    model_name = line.break up()[0]
                    fashions.append(model_name)
                   
            return fashions
           
        besides Exception as e:
            print(f"❌ Error itemizing fashions: {e}")
            return []
   
    def stop_server(self):
        """Cease Ollama server"""
        if self.course of:
            self.course of.terminate()
            self.course of.wait()
            self.is_running = False
            self.performance_monitor.cease()
            print("✅ Ollama server stopped")

We create the OllamaManager class to put in, begin, monitor, and handle the Ollama server within the Colab surroundings. We set surroundings variables for GPU parallelism, run the server within the background, and confirm it’s up with a well being test. We pull fashions on demand, cache them, listing out there ones regionally, and gracefully shut down the server when the duty is full, all whereas monitoring efficiency.

[Download the full codes with notebook here]

class PerformanceMonitor:
    """Monitor system efficiency and useful resource utilization"""
   
    def __init__(self):
        self.monitoring = False
        self.stats = {
            "cpu_usage": [],
            "memory_usage": [],
            "gpu_usage": [],
            "inference_times": []
        }
        self.monitor_thread = None
   
    def begin(self):
        """Begin efficiency monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(goal=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.begin()
   
    def cease(self):
        """Cease efficiency monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.be part of()
   
    def _monitor_loop(self):
        """Most important monitoring loop"""
        whereas self.monitoring:
            attempt:
                cpu_percent = psutil.cpu_percent(interval=1)
                reminiscence = psutil.virtual_memory()
               
                self.stats["cpu_usage"].append(cpu_percent)
                self.stats["memory_usage"].append(reminiscence.%)
               
                for key in ["cpu_usage", "memory_usage"]:
                    if len(self.stats[key]) > 100:
                        self.stats[key] = self.stats[key][-100:]
               
                time.sleep(5)
               
            besides Exception as e:
                print(f"Monitoring error: {e}")
   
    def get_stats(self) -> Dict[str, Any]:
        """Get present efficiency statistics"""
        return {
            "avg_cpu": sum(self.stats["cpu_usage"][-10:]) / max(len(self.stats["cpu_usage"][-10:]), 1),
            "avg_memory": sum(self.stats["memory_usage"][-10:]) / max(len(self.stats["memory_usage"][-10:]), 1),
            "total_inferences": len(self.stats["inference_times"]),
            "avg_inference_time": sum(self.stats["inference_times"]) / max(len(self.stats["inference_times"]), 1)
        }

We outline a PerformanceMonitor class to trace CPU, reminiscence, and inference instances in real-time whereas the Ollama server runs. We launch a background thread to gather stats each few seconds, retailer current metrics, and supply common utilization summaries. This helps us monitor system load and optimize efficiency throughout mannequin inference.

[Download the full codes with notebook here]

class OllamaLLM(LLM):
    """Customized LangChain LLM for Ollama"""
   
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    temperature: float = 0.7
    max_tokens: int = 2048
    performance_monitor: Optionally available[PerformanceMonitor] = None
   
    @property
    def _llm_type(self) -> str:
        return "ollama"
   
    def _call(
        self,
        immediate: str,
        cease: Optionally available[List[str]] = None,
        run_manager: Optionally available[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Make API name to Ollama"""
        start_time = time.time()
       
        attempt:
            payload = {
                "mannequin": self.model_name,
                "immediate": immediate,
                "stream": False,
                "choices": {
                    "temperature": self.temperature,
                    "num_predict": self.max_tokens,
                    "cease": cease or []
                }
            }
           
            response = requests.submit(
                f"{self.base_url}/api/generate",
                json=payload,
                timeout=120
            )
           
            response.raise_for_status()
            end result = response.json()
           
            inference_time = time.time() - start_time
           
            if self.performance_monitor:
                self.performance_monitor.stats["inference_times"].append(inference_time)
           
            return end result.get("response", "")
           
        besides Exception as e:
            print(f"❌ Ollama API error: {e}")
            return f"Error: {str(e)}"

We wrap the Ollama API inside a customized OllamaLLM class appropriate with LangChain’s LLM interface. We outline how prompts are despatched to the Ollama server and document every inference time for efficiency monitoring. This lets us plug Ollama instantly into LangChain chains, brokers, and reminiscence elements whereas monitoring effectivity.

class RAGSystem:
    """Retrieval-Augmented Technology system"""
   
    def __init__(self, llm: OllamaLLM, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.llm = llm
        self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
        self.vector_store = None
        self.qa_chain = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )
   
    def add_documents(self, file_paths: Listing[str]):
        """Add paperwork to the vector retailer"""
        paperwork = []
       
        for file_path in file_paths:
            attempt:
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                else:
                    loader = TextLoader(file_path)
               
                docs = loader.load()
                paperwork.prolong(docs)
               
            besides Exception as e:
                print(f"❌ Error loading {file_path}: {e}")
       
        if paperwork:
            splits = self.text_splitter.split_documents(paperwork)
           
            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(splits, self.embeddings)
            else:
                self.vector_store.add_documents(splits)
           
            self.qa_chain = RetrievalQA.from_chain_type(
                llm=self.llm,
                chain_type="stuff",
                retriever=self.vector_store.as_retriever(search_kwargs={"ok": 3}),
                return_source_documents=True
            )
           
            print(f"✅ Added {len(splits)} doc chunks to vector retailer")
   
    def question(self, query: str) -> Dict[str, Any]:
        """Question the RAG system"""
        if not self.qa_chain:
            return {"reply": "No paperwork loaded. Please add paperwork first."}
       
        attempt:
            end result = self.qa_chain({"question": query})
            return {
                "reply": end result["result"],
                "sources": [doc.metadata for doc in result.get("source_documents", [])]
            }
        besides Exception as e:
            return {"reply": f"Error: {str(e)}"}

We use ConversationManager to handle multi-session reminiscence, enabling each buffer-based and summary-based chat histories for every session. Then, in OllamaLangChainSystem, we carry all elements collectively, server, LLM, RAG, reminiscence, instruments, and brokers, into one unified interface. We configure the system to put in Ollama, pull fashions, construct brokers with instruments like net search and RAG, and expose chat, doc add, and model-switching capabilities for seamless interplay.

class ConversationManager:
    """Handle dialog historical past and reminiscence"""
   
    def __init__(self, llm: OllamaLLM, memory_type: str = "buffer"):
        self.llm = llm
        self.conversations = {}
        self.memory_type = memory_type
       
    def get_conversation(self, session_id: str) -> ConversationChain:
        """Get or create dialog for session"""
        if session_id not in self.conversations:
            if self.memory_type == "buffer":
                reminiscence = ConversationBufferWindowMemory(ok=10)
            elif self.memory_type == "abstract":
                reminiscence = ConversationSummaryBufferMemory(
                    llm=self.llm,
                    max_token_limit=1000
                )
            else:
                reminiscence = ConversationBufferWindowMemory(ok=10)
           
            self.conversations[session_id] = ConversationChain(
                llm=self.llm,
                reminiscence=reminiscence,
                verbose=True
            )
       
        return self.conversations[session_id]
   
    def chat(self, session_id: str, message: str) -> str:
        """Chat with particular session"""
        dialog = self.get_conversation(session_id)
        return dialog.predict(enter=message)
   
    def clear_session(self, session_id: str):
        """Clear dialog historical past for session"""
        if session_id in self.conversations:
            del self.conversations[session_id]


class OllamaLangChainSystem:
    """Most important system integrating all elements"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.supervisor = OllamaManager(config)
        self.llm = None
        self.rag_system = None
        self.conversation_manager = None
        self.instruments = []
        self.agent = None
       
    def setup(self):
        """Full system setup"""
        print("🚀 Organising Ollama + LangChain system...")
       
        self.supervisor.install_ollama()
        self.supervisor.start_server()
       
        if not self.supervisor.pull_model(self.config.model_name):
            print("❌ Failed to tug default mannequin")
            return False
       
        self.llm = OllamaLLM(
            model_name=self.config.model_name,
            base_url=self.config.base_url,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens,
            performance_monitor=self.supervisor.performance_monitor
        )
       
        self.rag_system = RAGSystem(self.llm)
       
        self.conversation_manager = ConversationManager(self.llm)
       
        self._setup_tools()
       
        print("✅ System setup full!")
        return True
   
    def _setup_tools(self):
        """Setup instruments for the agent"""
        search = DuckDuckGoSearchRun()
       
        self.instruments = [
            Tool(
                name="Search",
                func=search.run,
                description="Search the internet for current information"
            ),
            Tool(
                name="RAG_Query",
                func=lambda q: self.rag_system.query(q)["answer"],
                description="Question loaded paperwork utilizing RAG"
            )
        ]
       
        self.agent = initialize_agent(
            instruments=self.instruments,
            llm=self.llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True
        )
   
    def chat(self, message: str, session_id: str = "default") -> str:
        """Easy chat interface"""
        return self.conversation_manager.chat(session_id, message)
   
    def rag_chat(self, query: str) -> Dict[str, Any]:
        """RAG-based chat"""
        return self.rag_system.question(query)
   
    def agent_chat(self, message: str) -> str:
        """Agent-based chat with instruments"""
        return self.agent.run(message)
   
    def switch_model(self, model_name: str) -> bool:
        """Swap to totally different mannequin"""
        if self.supervisor.pull_model(model_name):
            self.llm.model_name = model_name
            print(f"✅ Switched to mannequin: {model_name}")
            return True
        return False
   
    def load_documents(self, file_paths: Listing[str]):
        """Load paperwork into RAG system"""
        self.rag_system.add_documents(file_paths)
   
    def get_performance_stats(self) -> Dict[str, Any]:
        """Get system efficiency statistics"""
        return self.supervisor.performance_monitor.get_stats()
   
    def cleanup(self):
        """Clear up assets"""
        self.supervisor.stop_server()
        print("✅ System cleanup full")

We use the ConversationManager to take care of separate chat classes, every with its reminiscence kind, both buffer-based or summary-based, permitting us to protect or summarize context as wanted. Within the OllamaLangChainSystem, we combine every little thing: we set up and launch Ollama, pull the specified mannequin, wrap it in a LangChain-compatible LLM, join a RAG system, initialize chat reminiscence, and register exterior instruments like net search.

def principal():
    """Most important perform demonstrating the system"""
   
    config = OllamaConfig(
        model_name="llama2",
        temperature=0.7,
        max_tokens=2048
    )
   
    system = OllamaLangChainSystem(config)
   
    attempt:
        if not system.setup():
            return
       
        print("n🗣️ Testing fundamental chat:")
        response = system.chat("Hiya! How are you?")
        print(f"Response: {response}")
       
        print("n🔄 Testing mannequin switching:")
        fashions = system.supervisor.list_models()
        print(f"Out there fashions: {fashions}")
       
       
        print("n🤖 Testing agent:")
        agent_response = system.agent_chat("What is the present climate like?")
        print(f"Agent Response: {agent_response}")
       
        print("n📊 Efficiency Statistics:")
        stats = system.get_performance_stats()
        print(json.dumps(stats, indent=2))
       
    besides KeyboardInterrupt:
        print("n⏹️ Interrupted by person")
    besides Exception as e:
        print(f"❌ Error: {e}")
    lastly:
        system.cleanup()


def create_gradio_interface(system: OllamaLangChainSystem):
    """Create a Gradio interface for simple interplay"""
    attempt:
        import gradio as gr
       
        def chat_interface(message, historical past, mode):
            if mode == "Primary Chat":
                response = system.chat(message)
            elif mode == "RAG Chat":
                end result = system.rag_chat(message)
                response = end result["answer"]
            elif mode == "Agent Chat":
                response = system.agent_chat(message)
            else:
                response = "Unknown mode"
           
            historical past.append((message, response))
            return "", historical past
       
        def upload_docs(recordsdata):
            if recordsdata:
                file_paths = [f.name for f in files]
                system.load_documents(file_paths)
                return f"Loaded {len(file_paths)} paperwork into RAG system"
            return "No recordsdata uploaded"
       
        def get_stats():
            stats = system.get_performance_stats()
            return json.dumps(stats, indent=2)
       
        with gr.Blocks(title="Ollama + LangChain System") as demo:
            gr.Markdown("# 🦙 Ollama + LangChain Superior System")
           
            with gr.Tab("Chat"):
                chatbot = gr.Chatbot()
                mode = gr.Dropdown(
                    ["Basic Chat", "RAG Chat", "Agent Chat"],
                    worth="Primary Chat",
                    label="Chat Mode"
                )
                msg = gr.Textbox(label="Message")
                clear = gr.Button("Clear")
               
                msg.submit(chat_interface, [msg, chatbot, mode], [msg, chatbot])
                clear.click on(lambda: ([], ""), outputs=[chatbot, msg])
           
            with gr.Tab("Doc Add"):
                file_upload = gr.File(file_count="a number of", label="Add Paperwork")
                upload_btn = gr.Button("Add to RAG System")
                upload_status = gr.Textbox(label="Standing")
               
                upload_btn.click on(upload_docs, file_upload, upload_status)
           
            with gr.Tab("Efficiency"):
                stats_btn = gr.Button("Get Efficiency Stats")
                stats_output = gr.Textbox(label="Efficiency Statistics")
               
                stats_btn.click on(get_stats, outputs=stats_output)
       
        return demo
       
    besides ImportError:
        print("Gradio not put in. Skipping interface creation.")
        return None


if __name__ == "__main__":
    print("🚀 Ollama + LangChain System for Google Colab")
    print("=" * 50)
   
    principal()
   
    # Or create a system occasion for interactive use
    # config = OllamaConfig(model_name="llama2")
    # system = OllamaLangChainSystem(config)
    # system.setup()
   
    # # Create Gradio interface
    # demo = create_gradio_interface(system)
    # if demo:
    #     demo.launch(share=True)  # share=True for public hyperlink

We wrap every little thing up in the primary perform to run a full demo, organising the system, testing chat, agent instruments, mannequin itemizing, and efficiency statistics. Then, in create_gradio_interface(), we construct a user-friendly Gradio app with tabs for chatting, importing paperwork to the RAG system, and monitoring efficiency. Lastly, we name principal() within the __main__ block for direct Colab execution, or optionally launch the Gradio UI for interactive exploration and public sharing.

In conclusion, we’ve got a versatile playground: we change Ollama fashions, converse with buffered or abstract reminiscence, query our personal paperwork, attain out to go looking when context is lacking, and monitor fundamental useful resource stats to remain inside Colab limits. The code is modular, permitting us to increase the instrument listing, tune inference choices (temperature, most tokens, concurrency) in OllamaConfig, or adapt the RAG pipeline to bigger corpora or totally different embedding fashions. We launch the Gradio app with share=True to collaborate or embed these elements in our tasks. We now personal an extensible template for quick native LLM experimentation.


Try the Codes. All credit score for this analysis goes to the researchers of this challenge. SUBSCRIBE NOW to our AI Publication


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its reputation amongst audiences.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments