22 lutego, 2025

„Tworzenie Systemu opartego na Agentach AI z LangGraph: Przewodnik krok po kroku – Dodawanie Persistencji i Streamingu”

W poprzednim poradniku stworzyliśmy agenta AI zdolnego do przeszukiwania sieci i udzielania odpowiedzi na pytania użytkowników. Jednak w przypadku bardziej złożonych i długotrwałych zadań kluczowe stają się dwa aspekty: persistence (trwałość) i streaming (strumieniowanie). Trwałość pozwala na zapisanie stanu agenta w dowolnym momencie, dzięki czemu można wznowić interakcję w przyszłości od tego samego miejsca. Jest to szczególnie istotne w długotrwałych aplikacjach. Z kolei strumieniowanie umożliwia przesyłanie w czasie rzeczywistym informacji o działaniach agenta, co zwiększa przejrzystość i kontrolę nad jego operacjami. W tym artykule omówimy, jak dodać te dwie funkcje do naszego agenta, znacząco rozszerzając jego możliwości.

Konfiguracja Agenta

Pierwszym krokiem jest odtworzenie naszego agenta w nowej formie. W tym celu załadujemy odpowiednie zmienne środowiskowe, zainstalujemy i zaimportujemy wymagane biblioteki, skonfigurujemy narzędzie wyszukiwawcze Tavily, zdefiniujemy stan agenta oraz zbudujemy jego strukturę.

Do zainstalowania potrzebnych pakietów należy użyć poniższej komendy:

bash
pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1

Następnie importujemy niezbędne moduły i definiujemy klasę agenta:

python
import os
os.environ['TAVILY_API_KEY'] = ""
os.environ['GROQ_API_KEY'] = ""

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults

tool = TavilySearchResults(max_results=2)

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

class Agent:
    def __init__(self, model, tools, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile()
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

Dodanie Funkcji Trwałości

Aby zapewnić trwałość, wykorzystamy funkcję checkpointer z biblioteki LangGraph. Checkpointer umożliwia zapis stanu agenta po wykonaniu każdego kroku. W naszym przypadku użyjemy SqliteSaver, który korzysta z bazy danych SQLite. Dzięki temu możemy przechowywać dane na lokalnym dysku lub w zewnętrznej bazie.

Kod konfiguracji SQLite wygląda następująco:

python
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(sqlite_conn)

Aby agent mógł korzystać z trwałości, musimy zmodyfikować jego konstruktor:

python
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        # Reszta kodu bez zmian
        self.graph = graph.compile(checkpointer=checkpointer)

Po tej modyfikacji możemy utworzyć agenta z włączoną funkcją trwałości:

python
prompt = """Jesteś inteligentnym asystentem badawczym. Korzystaj z wyszukiwarki, aby znaleźć potrzebne informacje. 
Możesz wykonywać wiele zapytań (jednocześnie lub sekwencyjnie). 
Używaj wyszukiwania tylko wtedy, gdy masz pewność, czego szukasz."""
model = ChatGroq(model="Llama-3.3-70b-Specdec")
bot = Agent(model, [tool], system=prompt, checkpointer=memory)

Dodanie Funkcji Strumieniowania

Strumieniowanie umożliwia przesyłanie w czasie rzeczywistym informacji o działaniach agenta. W tym celu skupimy się na dwóch rodzajach strumieniowania:

1. Strumieniowanie wiadomości – przesyłanie pośrednich wyników, takich jak decyzje AI czy rezultaty narzędzi.
2. Strumieniowanie tokenów – przesyłanie pojedynczych tokenów z odpowiedzi modelu.

Przykład Strumieniowania Wiadomości

Poniżej przedstawiamy kod, który pozwala na obserwowanie działań agenta w czasie rzeczywistym:

python
messages = [HumanMessage(content="Jaka jest pogoda w Teksasie?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

Dzięki temu możemy zobaczyć kolejne kroki agenta, takie jak decyzje o wyszukiwaniu informacji i ich wyniki.

Przykład Strumieniowania Tokenów

Aby strumieniować tokeny, używamy metody astream_events. Jest to funkcja asynchroniczna, co wymaga zastosowania async checkpointera:

python
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
    messages = [HumanMessage(content="Jaka jest pogoda w San Francisco?")]
    thread = {"configurable": {"thread_id": "4"}}
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="|")

Podsumowanie

Dodanie trwałości i strumieniowania znacząco podnosi funkcjonalność agenta AI. Trwałość pozwala na zachowanie kontekstu w dłuższych interakcjach, co jest niezbędne w aplikacjach produkcyjnych. Z kolei strumieniowanie umożliwia obserwowanie działań agenta w czasie rzeczywistym, co jest szczególnie przydatne w aplikacjach z udziałem wielu użytkowników.

W kolejnych poradnikach zajmiemy się interakcjami z udziałem człowieka, w których trwałość odegra kluczową rolę w zapewnieniu płynności współpracy między ludźmi a agentami AI.