Spaces:
Paused
Paused
| import os | |
| from dotenv import load_dotenv | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.prebuilt import ToolNode | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_community.document_loaders import ArxivLoader | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from langchain.tools.retriever import create_retriever_tool | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| import shutil | |
| import pandas as pd # Ny import för pandas | |
| import json # För att parsa metadata-kolumnen | |
| load_dotenv() | |
| # Tools: | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """Add two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a - b | |
| def divide(a: int, b: int) -> int: | |
| """Divide two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Get the modulus of two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a % b | |
| def wiki_search(query: str) -> str: | |
| """Search Wikipedia for a query and return maximum 2 results. | |
| Args: | |
| query: The search query.""" | |
| search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"wiki_results": formatted_search_docs} | |
| def web_search(query: str) -> str: | |
| """Search Tavily for a query and return maximum 3 results. | |
| Args: | |
| query: The search query.""" | |
| search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"web_results": formatted_search_docs} | |
| def arvix_search(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result. | |
| Args: | |
| query: The search query.""" | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"arvix_results": formatted_search_docs} | |
| # load the system prompt from the file | |
| with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
| system_prompt = f.read() | |
| # Retrieval | |
| INDEX_DIR = "./faiss_index" | |
| CSV_PATH = "./supabase_docs.csv" | |
| # Use a lightweight 384-dim model to avoid excessive RAM and potential onnxruntime crashes | |
| EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| _SIMILARITY_THRESHOLD = 0.2 # lower distance means more similar | |
| embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL, model_kwargs={"device": "cpu"}) | |
| if os.path.exists(INDEX_DIR): | |
| print(f"Loading existing FAISS index from {INDEX_DIR}") | |
| vector_store = FAISS.load_local(INDEX_DIR, embeddings, allow_dangerous_deserialization=True) | |
| else: | |
| print(f"Creating new FAISS index at {INDEX_DIR}, and loading documents from {CSV_PATH}") | |
| if os.path.exists(INDEX_DIR): | |
| shutil.rmtree(INDEX_DIR) | |
| os.makedirs(INDEX_DIR) | |
| if not os.path.exists(CSV_PATH): | |
| raise FileNotFoundError(f"CSV file {CSV_PATH} does not exist") | |
| df = pd.read_csv(CSV_PATH) | |
| documents = [] | |
| for i, row in df.iterrows(): | |
| content = row["content"] | |
| question_part = content.split("Final answer :")[0].strip() | |
| final_answer_part = content.split("Final answer :")[-1].strip() if "Final answer :" in content else "" | |
| try: | |
| metadata = json.loads(row["metadata"].replace("'", '"')) | |
| except json.JSONDecodeError: | |
| metadata = {} | |
| metadata["final_answer"] = final_answer_part | |
| documents.append(Document(page_content=question_part, metadata=metadata)) | |
| # Simple progress indicator every 200 docs | |
| if (i + 1) % 200 == 0: | |
| print(f"Prepared {i + 1}/{len(df)} documents for embedding…") | |
| if not documents: | |
| print("No documents loaded from CSV. FAISS index will be empty.") | |
| vector_store = FAISS.from_documents(documents=[], embedding=embeddings) | |
| vector_store.save_local(INDEX_DIR) | |
| else: | |
| print("Embedding documents and building FAISS index — this may take a few minutes…") | |
| vector_store = FAISS.from_documents(documents=documents, embedding=embeddings) | |
| vector_store.save_local(INDEX_DIR) | |
| print(f"FAISS index built and saved with {len(documents)} documents from CSV.") | |
| # Retriever tool | |
| retriever_tool = create_retriever_tool( | |
| retriever=vector_store.as_retriever(), | |
| name="Question_Search", | |
| description="Retrieve similar questions from FAISS index; metadata includes 'final_answer'." | |
| ) | |
| # Agent | |
| tools = [ | |
| multiply, | |
| add, | |
| subtract, | |
| divide, | |
| modulus, | |
| wiki_search, | |
| web_search, | |
| arvix_search, | |
| retriever_tool, | |
| ] | |
| def build_graph_agent(): | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-1.5-flash", | |
| temperature=0.0, | |
| ) | |
| llm_with_tools = llm.bind_tools(tools) | |
| def assistant(state: MessagesState): | |
| return { | |
| "messages": [llm_with_tools.invoke(state["messages"])], | |
| } | |
| def retriever(state: MessagesState): | |
| query = state["messages"][-1].content | |
| similar_docs = vector_store.similarity_search(query, k=3) | |
| if similar_docs: | |
| similar_doc = similar_docs[0] | |
| if "final_answer" in similar_doc.metadata and similar_doc.metadata["final_answer"]: | |
| answer = similar_doc.metadata["final_answer"] | |
| elif "Final answer :" in similar_doc.page_content: | |
| answer = similar_doc.page_content.split("Final answer :")[-1].strip() | |
| else: | |
| answer = similar_doc.page_content.strip() | |
| return {"messages": [AIMessage(content=answer)]} | |
| else: | |
| return {"messages": [AIMessage(content="No similar questions found in the knowledge base.")]} | |
| builder = StateGraph(MessagesState) | |
| builder.add_node("retriever", retriever) | |
| builder.set_entry_point("retriever") | |
| builder.set_finish_point("retriever") | |
| return builder.compile() |