import os from dotenv import load_dotenv from langgraph.graph import START, StateGraph, MessagesState from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode from langchain_google_genai import ChatGoogleGenerativeAI from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.tools.tavily_search import TavilySearchResults from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from langchain_core.tools import tool from langchain.tools.retriever import create_retriever_tool from langchain_community.vectorstores import FAISS from langchain_core.documents import Document import shutil import pandas as pd # Ny import för pandas import json # För att parsa metadata-kolumnen load_dotenv() # Tools: @tool def multiply(a: int, b: int) -> int: """Multiply two numbers. Args: a: first int b: second int """ return a * b @tool def add(a: int, b: int) -> int: """Add two numbers. Args: a: first int b: second int """ return a + b @tool def subtract(a: int, b: int) -> int: """Subtract two numbers. Args: a: first int b: second int """ return a - b @tool def divide(a: int, b: int) -> int: """Divide two numbers. Args: a: first int b: second int """ if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def modulus(a: int, b: int) -> int: """Get the modulus of two numbers. Args: a: first int b: second int """ return a % b @tool def wiki_search(query: str) -> str: """Search Wikipedia for a query and return maximum 2 results. Args: query: The search query.""" search_docs = WikipediaLoader(query=query, load_max_docs=2).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"wiki_results": formatted_search_docs} @tool def web_search(query: str) -> str: """Search Tavily for a query and return maximum 3 results. Args: query: The search query.""" search_docs = TavilySearchResults(max_results=3).invoke(query=query) formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"web_results": formatted_search_docs} @tool def arvix_search(query: str) -> str: """Search Arxiv for a query and return maximum 3 result. Args: query: The search query.""" search_docs = ArxivLoader(query=query, load_max_docs=3).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ]) return {"arvix_results": formatted_search_docs} # load the system prompt from the file with open("system_prompt.txt", "r", encoding="utf-8") as f: system_prompt = f.read() # Retrieval INDEX_DIR = "./faiss_index" CSV_PATH = "./supabase_docs.csv" # Use a lightweight 384-dim model to avoid excessive RAM and potential onnxruntime crashes EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" _SIMILARITY_THRESHOLD = 0.2 # lower distance means more similar embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL, model_kwargs={"device": "cpu"}) if os.path.exists(INDEX_DIR): print(f"Loading existing FAISS index from {INDEX_DIR}") vector_store = FAISS.load_local(INDEX_DIR, embeddings, allow_dangerous_deserialization=True) else: print(f"Creating new FAISS index at {INDEX_DIR}, and loading documents from {CSV_PATH}") if os.path.exists(INDEX_DIR): shutil.rmtree(INDEX_DIR) os.makedirs(INDEX_DIR) if not os.path.exists(CSV_PATH): raise FileNotFoundError(f"CSV file {CSV_PATH} does not exist") df = pd.read_csv(CSV_PATH) documents = [] for i, row in df.iterrows(): content = row["content"] question_part = content.split("Final answer :")[0].strip() final_answer_part = content.split("Final answer :")[-1].strip() if "Final answer :" in content else "" try: metadata = json.loads(row["metadata"].replace("'", '"')) except json.JSONDecodeError: metadata = {} metadata["final_answer"] = final_answer_part documents.append(Document(page_content=question_part, metadata=metadata)) # Simple progress indicator every 200 docs if (i + 1) % 200 == 0: print(f"Prepared {i + 1}/{len(df)} documents for embedding…") if not documents: print("No documents loaded from CSV. FAISS index will be empty.") vector_store = FAISS.from_documents(documents=[], embedding=embeddings) vector_store.save_local(INDEX_DIR) else: print("Embedding documents and building FAISS index — this may take a few minutes…") vector_store = FAISS.from_documents(documents=documents, embedding=embeddings) vector_store.save_local(INDEX_DIR) print(f"FAISS index built and saved with {len(documents)} documents from CSV.") # Retriever tool retriever_tool = create_retriever_tool( retriever=vector_store.as_retriever(), name="Question_Search", description="Retrieve similar questions from FAISS index; metadata includes 'final_answer'." ) # Agent tools = [ multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search, retriever_tool, ] def build_graph_agent(): llm = ChatGoogleGenerativeAI( model="gemini-1.5-flash", temperature=0.0, ) llm_with_tools = llm.bind_tools(tools) def assistant(state: MessagesState): return { "messages": [llm_with_tools.invoke(state["messages"])], } def retriever(state: MessagesState): query = state["messages"][-1].content similar_docs = vector_store.similarity_search(query, k=3) if similar_docs: similar_doc = similar_docs[0] if "final_answer" in similar_doc.metadata and similar_doc.metadata["final_answer"]: answer = similar_doc.metadata["final_answer"] elif "Final answer :" in similar_doc.page_content: answer = similar_doc.page_content.split("Final answer :")[-1].strip() else: answer = similar_doc.page_content.strip() return {"messages": [AIMessage(content=answer)]} else: return {"messages": [AIMessage(content="No similar questions found in the knowledge base.")]} builder = StateGraph(MessagesState) builder.add_node("retriever", retriever) builder.set_entry_point("retriever") builder.set_finish_point("retriever") return builder.compile()