diff --git a/docs/openapi.json b/docs/openapi.json index 58f4be15..52b6980a 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -884,7 +884,7 @@ "type": "string", "title": "Session Id", "description": "Session ID for the MOS. This is used to distinguish between different dialogue", - "default": "5cc9f7d7-74c3-4bb3-9b4f-11adf416530b" + "default": "3d88949f-cbe1-4244-a2e1-d346e8b76ca0" }, "chat_model": { "$ref": "#/components/schemas/LLMConfigFactory", @@ -940,6 +940,12 @@ "title": "Enable Mem Scheduler", "description": "Enable memory scheduler for automated memory management", "default": false + }, + "PRO_MODE": { + "type": "boolean", + "title": "Pro Mode", + "description": "Enable PRO mode for complex query decomposition", + "default": false } }, "additionalProperties": false, diff --git a/examples/core_memories/textual_internet_memoy.py b/examples/core_memories/textual_internet_memoy.py new file mode 100644 index 00000000..21979e89 --- /dev/null +++ b/examples/core_memories/textual_internet_memoy.py @@ -0,0 +1,241 @@ +from memos import log +from memos.configs.embedder import EmbedderConfigFactory +from memos.configs.internet_retriever import InternetRetrieverConfigFactory +from memos.configs.mem_reader import SimpleStructMemReaderConfig +from memos.configs.memory import TreeTextMemoryConfig +from memos.embedders.factory import EmbedderFactory +from memos.mem_reader.simple_struct import SimpleStructMemReader +from memos.memories.textual.tree import TreeTextMemory + + +logger = log.get_logger(__name__) + + +embedder_config = EmbedderConfigFactory.model_validate( + { + "backend": "ollama", + "config": { + "model_name_or_path": "nomic-embed-text:latest", + }, + } +) +embedder = EmbedderFactory.from_config(embedder_config) + + +def embed_memory_item(memory: str) -> list[float]: + return embedder.embed([memory])[0] + + +# google or xinyu pelase change tree_config_with_internet api key or token or url +tree_config = TreeTextMemoryConfig.from_json_file("examples/data/config/tree_config.json") +interconfig = InternetRetrieverConfigFactory( + backend="xinyu", + config={ + "api_key": "google or xinyu token", + "search_engine_id": "google engine id or xinyu url", + "max_results": 15, + "num_per_request": 10, + }, +) +tree_config.internet_retriever = interconfig + +my_tree_textual_memory = TreeTextMemory(tree_config) +my_tree_textual_memory.delete_all() + +# Create a memory reader instance +reader_config = SimpleStructMemReaderConfig.from_json_file( + "examples/data/config/simple_struct_reader_config.json" +) +reader = SimpleStructMemReader(reader_config) + +scene_data = [ + [ + {"role": "user", "chat_time": "3 May 2025", "content": "I’m feeling a bit down today."}, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "I’m sorry to hear that. Do you want to talk about what’s been going on?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "It’s just been a tough couple of days, you know? Everything feels a bit overwhelming, and I just can’t seem to shake it off.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "It sounds like you're going through a lot right now. Sometimes it helps to talk things out. Is there something specific that's been weighing on you, or is it more of a general feeling?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "It’s a mix, I guess. Work’s been really stressful, and on top of that, I’ve been feeling kinda disconnected from the people around me.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "That can be really draining, especially when you’re feeling isolated on top of the stress. Do you think there’s something from your past that’s contributing to how you’re feeling now? Sometimes our emotions are tied to older experiences.", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "Yeah, now that you mention it… I’ve been thinking a lot about my childhood lately. I didn’t have the easiest upbringing. My parents were always busy, and I often felt like I had to figure things out on my own. I remember one winter, I got really sick with a high fever. My parents were both working, and I didn’t want to bother them. I just lay in bed, crying by myself, not knowing what to do.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "That must have been very lonely. Growing up without that support can really make you feel like you have to handle everything on your own. How do you think that experience has affected you now?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "Yeah, it was really lonely. I remember spending a lot of time doing things on my own, like drawing, because it was my only way to escape. I used to draw animals, and I created my own little world on paper. It was peaceful in a way, just me and my imagination, with no outside distractions.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "Drawing sounds like a beautiful way to express yourself and find some peace in that solitude. It must have been a very quiet and private world for you. Do you think that sense of loneliness from back then has shaped how you connect with others today?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "I think it has. I’ve always found it hard to let others in. I feel like I have to handle everything on my own. If I rely on someone else, I feel vulnerable, and I start to feel that loneliness again.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "It sounds like this was a protective mechanism that helped you cope with your feelings of loneliness back then. But as you’ve grown, it seems like this pattern might not be serving you as well anymore. Have you thought about trying to break this cycle, or finding new ways to connect with others?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "I’ve thought about it, though it’s hard. But I also know I can’t keep going like this forever. Maybe it’s time to seek some help.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "It’s a big step, but a positive one. Exploring your emotions and past experiences could really help you understand how they’ve shaped your present. Therapy could be a great place to start, as it would help you connect with both yourself and others. Would you be open to trying it?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "I’m open to trying. Thank you for encouraging me. It feels like it might be a good starting point for me.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "I’m really glad to hear that. Taking that first step is always difficult, but you’ve made an important decision. If you need help finding the right therapist, or just want to talk, I’m here for you.", + }, + ], + [ + { + "role": "user", + "chat_time": "3 May 2025", + "content": "This week I’ve been feeling a bit off, especially when playing tennis. My body just doesn’t feel right.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "It sounds like you've been having some physical discomfort lately. Where specifically are you feeling unwell? Is there something particular about tennis that's been difficult?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "I just feel a bit fatigued, and every time I play tennis, my movements feel off. I used to play pretty well, but lately, I’ve been feeling weak and my movements aren’t as coordinated as they used to be.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "Physical discomfort can definitely affect performance, especially in a sport like tennis that requires quick reactions and coordination. Have you noticed anything specific that might be causing these changes? Could it be overtraining, or is there another physical issue making you feel off?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "I think it might be due to stress and lack of sleep recently. I just feel drained all the time. Plus, I’ve been frustrated with my tennis performance, which makes me feel even worse, like I’m stuck in a vicious cycle.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "Stress and poor sleep quality can definitely affect your physical performance, especially in sports. When you’re also feeling down about your performance, it’s easy to fall into a negative cycle. Have you tried anything to relieve some of the stress, or to improve your sleep?", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "I’ve tried some relaxation techniques, like deep breathing and simple meditation, but it doesn’t seem to help much. I still can’t focus during tennis, and I feel like my mind is distracted by other things.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "Relaxation techniques like deep breathing and meditation can help reduce stress, but sometimes we need a more structured approach to improve both our physical and mental state. For example, you might want to focus on specific aspects of your tennis training, or adjust your rest and recovery time. Have you thought about setting smaller goals for yourself? Like focusing on a specific tennis move each day, rather than expecting perfection right away? That might help you gradually regain confidence.", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "That’s a good idea. I think I’ve been expecting too much from myself, which just makes me more anxious. Maybe I can start with small practice sessions and gradually find my rhythm again.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "Exactly! Setting small and clear goals will help reduce pressure and give you a sense of accomplishment. And don’t forget that rest is just as important—taking time to recover will help you feel better physically and mentally. Both physical and mental recovery are key to improving your performance and health.", + }, + { + "role": "user", + "chat_time": "3 May 2025", + "content": "Thanks, I feel like I need to rethink a few things. Maybe I really need to be kinder to myself and take things step by step.", + }, + { + "role": "assistant", + "chat_time": "3 May 2025", + "content": "You’re on the right track. Rethinking things and giving yourself space and patience is so important. You’ve already taken the first step, now just keep moving forward, one step at a time. If you need anything, I’m always here to help.", + }, + ], +] + +# Acquiring memories +memory = reader.get_memory(scene_data, type="chat", info={"user_id": "1234", "session_id": "2222"}) + +for m_list in memory: + my_tree_textual_memory.add(m_list) + +results = my_tree_textual_memory.search( + "Talk about the user's childhood story?", + top_k=10, + info={"query": "Talk about the user's childhood story?", "user_id": "111", "session": "2234"}, +) +for i, r in enumerate(results): + r = r.to_dict() + print(f"{i}'th similar result is: " + str(r["memory"])) +print(f"Successfully search {len(results)} memories") + +# find related nodes +related_nodes = my_tree_textual_memory.get_relevant_subgraph("Painting") + +# get current memory_size +print(f"Current Memory Size is {my_tree_textual_memory.get_current_memory_size()}") + +logger.info("Start doc search example...") +# Processing Documents +doc_paths = [ + "./text1.txt", + "./text2.txt", +] +# Acquiring memories from documents +doc_memory = reader.get_memory(doc_paths, "doc", info={"user_id": "1111", "session_id": "2222"}) + +for m_list in doc_memory: + my_tree_textual_memory.add(m_list) + +results = my_tree_textual_memory.search( + "Tell me about what memos consist of?", + top_k=30, + info={"query": "Tell me about what memos consist of?", "user_id": "111", "session": "2234"}, +) +for i, r in enumerate(results): + r = r.to_dict() + print(f"{i}'th similar result is: " + str(r["memory"])) +print(f"Successfully search {len(results)} memories") + + +# my_tree_textual_memory.dump +my_tree_textual_memory.dump("tmp/my_tree_textual_memory") +my_tree_textual_memory.drop() diff --git a/examples/mem_os/cot_usage_example.py b/examples/mem_os/cot_usage_example.py new file mode 100644 index 00000000..e3d10fa7 --- /dev/null +++ b/examples/mem_os/cot_usage_example.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +""" +MemOS CoT (Chain of Thought) Usage Example +This example demonstrates how to use CoT functionality with tree textual memory. +It shows how to: +1. Decompose complex questions into sub-questions +2. Get answers for sub-questions using tree_textual_memory +3. Use JSON configuration files with environment variable overrides +""" + +import json +import os + +# Load environment variables +from dotenv import load_dotenv + +from memos.configs.llm import LLMConfigFactory +from memos.configs.mem_reader import SimpleStructMemReaderConfig +from memos.configs.memory import TreeTextMemoryConfig +from memos.mem_os.main import MOS +from memos.mem_reader.simple_struct import SimpleStructMemReader +from memos.memories.textual.tree import TreeTextMemory + + +load_dotenv() + + +def load_and_modify_config(config_path: str) -> dict: + """Load JSON config and modify it with environment variables.""" + with open(config_path) as f: + config = json.load(f) + + # Get environment variables + openai_api_key = os.getenv("OPENAI_API_KEY") + openai_base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") + + # Modify config to use ollama for embedder and gpt-4o-mini for LLMs + if "embedder" in config: + config["embedder"] = { + "backend": "ollama", + "config": {"model_name_or_path": "nomic-embed-text:latest"}, + } + + # Modify LLM configs to use gpt-4o-mini + if "llm" in config: + config["llm"] = { + "backend": "openai", + "config": { + "model_name_or_path": "gpt-4o-mini", + "api_key": openai_api_key, + "api_base": openai_base_url, + "temperature": 0.5, + "remove_think_prefix": True, + "max_tokens": 8192, + }, + } + + if "extractor_llm" in config: + config["extractor_llm"] = { + "backend": "openai", + "config": { + "model_name_or_path": "gpt-4o-mini", + "api_key": openai_api_key, + "api_base": openai_base_url, + "temperature": 0.5, + "remove_think_prefix": True, + "max_tokens": 8192, + }, + } + + if "dispatcher_llm" in config: + config["dispatcher_llm"] = { + "backend": "openai", + "config": { + "model_name_or_path": "gpt-4o-mini", + "api_key": openai_api_key, + "api_base": openai_base_url, + "temperature": 0.5, + "remove_think_prefix": True, + "max_tokens": 8192, + }, + } + + # Modify graph_db config if present + if "graph_db" in config: + neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") + neo4j_user = os.getenv("NEO4J_USER", "neo4j") + neo4j_password = os.getenv("NEO4J_PASSWORD", "12345678") + + config["graph_db"] = { + "backend": "neo4j", + "config": { + "uri": neo4j_uri, + "user": neo4j_user, + "password": neo4j_password, + "db_name": "testlcy", + "auto_create": True, + "embedding_dimension": 768, + }, + } + + return config + + +def setup_llm_config(): + """Setup LLM configuration for CoT operations.""" + # Get environment variables + openai_api_key = os.getenv("OPENAI_API_KEY") + openai_base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") + # Use ollama with gpt-4o-mini model + return LLMConfigFactory( + backend="openai", + config={ + "model_name_or_path": "gpt-4o-mini", + "api_key": openai_api_key, + "api_base": openai_base_url, + "temperature": 0.5, + "remove_think_prefix": True, + "max_tokens": 8192, + }, + ) + + +def create_tree_memory(): + """Create a tree textual memory with sample data.""" + print("Creating tree textual memory...") + + # Load and modify configurations + tree_config_dict = load_and_modify_config("examples/data/config/tree_config.json") + reader_config_dict = load_and_modify_config( + "examples/data/config/simple_struct_reader_config.json" + ) + + # Create config objects + tree_config = TreeTextMemoryConfig.model_validate(tree_config_dict) + reader_config = SimpleStructMemReaderConfig.model_validate(reader_config_dict) + + # Create tree memory + tree_memory = TreeTextMemory(tree_config) + tree_memory.delete_all() # Clear existing data + + # Create memory reader + reader = SimpleStructMemReader(reader_config) + + # Sample conversation data + sample_conversations = [ + [ + {"role": "user", "content": "Tell me about China and its capital."}, + { + "role": "assistant", + "content": "China is a country in East Asia. Beijing is its capital city.", + }, + {"role": "user", "content": "Who is Lang Ping?"}, + { + "role": "assistant", + "content": "Lang Ping is a famous Chinese volleyball coach and former player.", + }, + {"role": "user", "content": "What about Madagascar?"}, + { + "role": "assistant", + "content": "Madagascar is an island country in the Indian Ocean. It's known for its unique wildlife.", + }, + {"role": "user", "content": "Tell me about trade between China and Madagascar."}, + { + "role": "assistant", + "content": "China and Madagascar have developed trade relations, particularly in agriculture and mining.", + }, + {"role": "user", "content": "What about the essential oil industry in Madagascar?"}, + { + "role": "assistant", + "content": "The essential oil industry is growing in Madagascar, especially on Nosy Be Island where vanilla and ylang-ylang are produced.", + }, + ] + ] + + # Acquire memories using the reader + memories = reader.get_memory( + sample_conversations, type="chat", info={"user_id": "cot_user", "session_id": "cot_session"} + ) + + # Add memories to tree structure + for memory_list in memories: + tree_memory.add(memory_list) + + print("✓ Added sample conversations to tree memory") + return tree_memory + + +def cot_decompose(): + """Test the cot_decompose functionality.""" + print("\n=== Testing CoT Decomposition ===") + + # Setup LLM config + llm_config = setup_llm_config() + + # Test questions + test_questions = [ + "Who is the current head coach of the gymnastics team in the capital of the country that Lang Ping represents?", + "What is the weather like today?", + "How did the trade relationship between Madagascar and China develop, and how does this relationship affect the market expansion of the essential oil industry on Nosy Be Island?", + ] + + for i, question in enumerate(test_questions, 1): + print(f"\nTest {i}: {question}") + result = MOS.cot_decompose(question, llm_config) + print(f"✓ Decomposition result: {result}") + + if result.get("is_complex", False): + sub_questions = result.get("sub_questions", []) + print(f"✓ Found {len(sub_questions)} sub-questions:") + for j, sub_q in enumerate(sub_questions, 1): + print(f" {j}. {sub_q}") + else: + print("✓ Question is not complex, no decomposition needed.") + + return llm_config + + +def get_sub_answers_with_tree_memory(): + """Test get_sub_answers with tree textual memory.""" + print("\n=== Testing get_sub_answers with Tree Textual Memory ===") + + # Setup + llm_config = setup_llm_config() + tree_memory = create_tree_memory() + + # Test sub-questions + sub_questions = [ + "Which country does Lang Ping represent in volleyball?", + "What is the capital of this country?", + "Who is the current head coach of the gymnastics team in this capital?", + ] + + print("Sub-questions to answer:") + for i, q in enumerate(sub_questions, 1): + print(f" {i}. {q}") + print("\nGenerating answers using tree memory and LLM...") + sub_questions, sub_answers = MOS.get_sub_answers( + sub_questions=sub_questions, search_engine=tree_memory, llm_config=llm_config, top_k=3 + ) + + print("✓ Generated answers:") + for i, (question, answer) in enumerate(zip(sub_questions, sub_answers, strict=False), 1): + print(f"\n Sub-question {i}: {question}") + print(f" Answer: {answer}") + + +def complete_cot_workflow(): + """Test the complete CoT workflow from decomposition to final synthesis.""" + print("\n=== Testing Complete CoT Workflow ===") + + # Setup + llm_config = setup_llm_config() + tree_memory = create_tree_memory() + + # Complex question + complex_question = "How did the trade relationship between Madagascar and China develop, and how does this relationship affect the market expansion of the essential oil industry on Nosy Be Island?" + + print(f"Original question: {complex_question}") + + try: + # Step 1: Decompose the question + print("\n1. Decomposing question...") + decomposition_result = MOS.cot_decompose(complex_question, llm_config) + print(f"✓ Decomposition result: {decomposition_result}") + + if not decomposition_result.get("is_complex", False): + print("Question is not complex, no decomposition needed.") + return + + sub_questions = decomposition_result.get("sub_questions", []) + print(f"✓ Found {len(sub_questions)} sub-questions:") + for i, q in enumerate(sub_questions, 1): + print(f" {i}. {q}") + + # Step 2: Get answers for sub-questions + print("\n2. Getting answers for sub-questions...") + sub_questions, sub_answers = MOS.get_sub_answers( + sub_questions=sub_questions, search_engine=tree_memory, llm_config=llm_config, top_k=3 + ) + + print("✓ Generated answers:") + for i, (question, answer) in enumerate(zip(sub_questions, sub_answers, strict=False), 1): + print(f"\n Sub-question {i}: {question}") + print(f" Answer: {answer}") + + # Step 3: Generate final synthesis + print("\n3. Generating final synthesis...") + # Build the sub-questions and answers text + qa_text = "" + for i, (question, answer) in enumerate(zip(sub_questions, sub_answers, strict=False), 1): + qa_text += f"Q{i}: {question}\nA{i}: {answer}\n\n" + + synthesis_prompt = f"""You are an expert at synthesizing information from multiple sources to provide comprehensive answers. + +Sub-questions and their answers: +{qa_text} +Please synthesize these answers into a comprehensive response that: +1. Addresses the original question completely +2. Integrates information from all sub-questions +3. Provides clear reasoning and connections +4. Is well-structured and easy to understand + +Original question: {complex_question} + +Your response:""" + + # Generate final answer + from memos.llms.factory import LLMFactory + + llm = LLMFactory.from_config(llm_config) + messages = [ + { + "role": "system", + "content": "You are a helpful assistant that synthesizes information from multiple sources.", + }, + {"role": "user", "content": synthesis_prompt}, + ] + + final_answer = llm.generate(messages) + print(f"\n✓ Final synthesized answer:\n{final_answer}") + + except Exception as e: + print(f"✗ Error in complete workflow: {e}") + + +def main(): + """Main function to run the CoT example.""" + print("MemOS CoT (Chain of Thought) Usage Example") + print("=" * 60) + + # Run the examples + cot_decompose() + get_sub_answers_with_tree_memory() + complete_cot_workflow() + + print("\n" + "=" * 60) + print("✓ All examples completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_os/simple_openapi_memos.py b/examples/mem_os/simple_openapi_memos.py index e55a250d..a9faa3dc 100644 --- a/examples/mem_os/simple_openapi_memos.py +++ b/examples/mem_os/simple_openapi_memos.py @@ -1,14 +1,19 @@ +import os import time import uuid from datetime import datetime +from dotenv import load_dotenv + from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig from memos.mem_cube.general import GeneralMemCube from memos.mem_os.main import MOS +load_dotenv() + # 1. Create MOS Config and set openai config print(f"🚀 [{datetime.now().strftime('%H:%M:%S')}] Starting to create MOS configuration...") start_time = time.time() @@ -18,17 +23,17 @@ # 1.1 Set openai config openapi_config = { - "model_name_or_path": "gpt-4o", + "model_name_or_path": "gpt-4o-mini", "temperature": 0.8, "max_tokens": 1024, "top_p": 0.9, "top_k": 50, "remove_think_prefix": True, - "api_key": "sk-xxxxxx", - "api_base": "https://api.openai.com/v1", + "api_key": os.getenv("OPENAI_API_KEY", "sk-xxxxx"), + "api_base": os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1"), } # 1.2 Set neo4j config -neo4j_uri = "bolt://localhost:7687" +neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") # 1.3 Create MOS Config config = { @@ -69,6 +74,7 @@ } mos_config = MOSConfig(**config) +# you can set PRO_MODE to True to enable CoT enhancement mos_config.PRO_MODE = True mos = MOS(mos_config) print( diff --git a/src/memos/configs/internet_retriever.py b/src/memos/configs/internet_retriever.py new file mode 100644 index 00000000..56f892ac --- /dev/null +++ b/src/memos/configs/internet_retriever.py @@ -0,0 +1,81 @@ +"""Configuration classes for internet retrievers.""" + +from typing import Any, ClassVar + +from pydantic import Field, field_validator, model_validator + +from memos.configs.base import BaseConfig +from memos.exceptions import ConfigurationError + + +class BaseInternetRetrieverConfig(BaseConfig): + """Base configuration class for internet retrievers.""" + + api_key: str = Field(..., description="API key for the search service") + search_engine_id: str | None = Field( + None, description="Search engine ID (required for Google Custom Search)" + ) + + +class GoogleCustomSearchConfig(BaseInternetRetrieverConfig): + """Configuration class for Google Custom Search API.""" + + search_engine_id: str = Field(..., description="Google Custom Search Engine ID (cx parameter)") + max_results: int = Field(default=20, description="Maximum number of results to retrieve") + num_per_request: int = Field( + default=10, description="Number of results per API request (max 10 for Google)" + ) + + +class BingSearchConfig(BaseInternetRetrieverConfig): + """Configuration class for Bing Search API.""" + + endpoint: str = Field( + default="https://api.bing.microsoft.com/v7.0/search", description="Bing Search API endpoint" + ) + max_results: int = Field(default=20, description="Maximum number of results to retrieve") + num_per_request: int = Field(default=10, description="Number of results per API request") + + +class XinyuSearchConfig(BaseInternetRetrieverConfig): + """Configuration class for Xinyu Search API.""" + + search_engine_id: str | None = Field( + None, description="Not used for Xinyu Search (kept for compatibility)" + ) + max_results: int = Field(default=20, description="Maximum number of results to retrieve") + num_per_request: int = Field( + default=10, description="Number of results per API request (not used for Xinyu)" + ) + + +class InternetRetrieverConfigFactory(BaseConfig): + """Factory class for creating internet retriever configurations.""" + + backend: str | None = Field( + None, description="Backend for internet retriever (google, bing, etc.)" + ) + config: dict[str, Any] | None = Field( + None, description="Configuration for the internet retriever backend" + ) + + backend_to_class: ClassVar[dict[str, Any]] = { + "google": GoogleCustomSearchConfig, + "bing": BingSearchConfig, + "xinyu": XinyuSearchConfig, + } + + @field_validator("backend") + @classmethod + def validate_backend(cls, backend: str | None) -> str | None: + """Validate the backend field.""" + if backend is not None and backend not in cls.backend_to_class: + raise ConfigurationError(f"Invalid internet retriever backend: {backend}") + return backend + + @model_validator(mode="after") + def create_config(self) -> "InternetRetrieverConfigFactory": + if self.backend is not None: + config_class = self.backend_to_class[self.backend] + self.config = config_class(**self.config) + return self diff --git a/src/memos/configs/mem_os.py b/src/memos/configs/mem_os.py index d4135a2a..96b4094e 100644 --- a/src/memos/configs/mem_os.py +++ b/src/memos/configs/mem_os.py @@ -57,6 +57,10 @@ class MOSConfig(BaseConfig): default=False, description="Enable memory scheduler for automated memory management", ) + PRO_MODE: bool = Field( + default=False, + description="Enable PRO mode for complex query decomposition", + ) class MemOSConfigFactory(BaseConfig): diff --git a/src/memos/configs/memory.py b/src/memos/configs/memory.py index c728736c..f18ca2e8 100644 --- a/src/memos/configs/memory.py +++ b/src/memos/configs/memory.py @@ -5,6 +5,7 @@ from memos.configs.base import BaseConfig from memos.configs.embedder import EmbedderConfigFactory from memos.configs.graph_db import GraphDBConfigFactory +from memos.configs.internet_retriever import InternetRetrieverConfigFactory from memos.configs.llm import LLMConfigFactory from memos.configs.vec_db import VectorDBConfigFactory from memos.exceptions import ConfigurationError @@ -133,7 +134,7 @@ class GeneralTextMemoryConfig(BaseTextMemoryConfig): class TreeTextMemoryConfig(BaseTextMemoryConfig): - """General memory configuration class.""" + """Tree text memory configuration class.""" extractor_llm: LLMConfigFactory = Field( ..., @@ -155,6 +156,10 @@ class TreeTextMemoryConfig(BaseTextMemoryConfig): default_factory=GraphDBConfigFactory, description="Graph database configuration for the tree-memory storage", ) + internet_retriever: InternetRetrieverConfigFactory | None = Field( + None, + description="Internet retriever configuration (optional)", + ) # ─── 3. Global Memory Config Factory ────────────────────────────────────────── diff --git a/src/memos/mem_os/main.py b/src/memos/mem_os/main.py index bfdde39b..2c479b45 100644 --- a/src/memos/mem_os/main.py +++ b/src/memos/mem_os/main.py @@ -1,5 +1,21 @@ +import concurrent.futures +import json + +from typing import Any + from memos.configs.mem_os import MOSConfig +from memos.llms.factory import LLMFactory +from memos.log import get_logger from memos.mem_os.core import MOSCore +from memos.memories.textual.base import BaseTextMemory +from memos.templates.mos_prompts import ( + COT_DECOMPOSE_PROMPT, + PRO_MODE_WELCOME_MESSAGE, + SYNTHESIS_PROMPT, +) + + +logger = get_logger(__name__) class MOS(MOSCore): @@ -9,4 +25,479 @@ class MOS(MOSCore): """ def __init__(self, config: MOSConfig): + self.enable_cot = config.PRO_MODE + if config.PRO_MODE: + print(PRO_MODE_WELCOME_MESSAGE) + logger.info(PRO_MODE_WELCOME_MESSAGE) super().__init__(config) + + def chat(self, query: str, user_id: str | None = None) -> str: + """ + Enhanced chat method with optional CoT (Chain of Thought) enhancement. + + Args: + query (str): The user's query. + user_id (str, optional): User ID for context. + + Returns: + str: The response from the MOS. + """ + # Check if CoT enhancement is enabled (either explicitly or via PRO mode) + + if not self.enable_cot: + # Use the original chat method from core + return super().chat(query, user_id) + + # Enhanced chat with CoT decomposition + return self._chat_with_cot_enhancement(query, user_id) + + def _chat_with_cot_enhancement(self, query: str, user_id: str | None = None) -> str: + """ + Chat with CoT enhancement for complex query decomposition. + This method includes all the same validation and processing logic as the core chat method. + + Args: + query (str): The user's query. + user_id (str, optional): User ID for context. + + Returns: + str: The enhanced response. + """ + # Step 1: Perform all the same validation and setup as core chat method + target_user_id = user_id if user_id is not None else self.user_id + accessible_cubes = self.user_manager.get_user_cubes(target_user_id) + user_cube_ids = [cube.cube_id for cube in accessible_cubes] + + # Register chat history if needed + if target_user_id not in self.chat_history_manager: + self._register_chat_history(target_user_id) + + chat_history = self.chat_history_manager[target_user_id] + + try: + # Step 2: Decompose the query using CoT + logger.info(f"🔍 [CoT] Decomposing query: {query}") + decomposition_result = self.cot_decompose( + query, self.config.chat_model, target_user_id, self.chat_llm + ) + + # Check if the query is complex and needs decomposition + if not decomposition_result.get("is_complex", False): + logger.info("🔍 [CoT] Query is not complex, using standard chat") + return super().chat(query, user_id) + + sub_questions = decomposition_result.get("sub_questions", []) + logger.info(f"🔍 [CoT] Decomposed into {len(sub_questions)} sub-questions") + + # Step 3: Get search engine for sub-questions (with proper validation) + search_engine = self._get_search_engine_for_cot_with_validation(user_cube_ids) + if not search_engine: + logger.warning("🔍 [CoT] No search engine available, using standard chat") + return super().chat(query, user_id) + + # Step 4: Get answers for sub-questions + logger.info("🔍 [CoT] Getting answers for sub-questions...") + sub_questions, sub_answers = self.get_sub_answers( + sub_questions=sub_questions, + search_engine=search_engine, + llm_config=self.config.chat_model, + user_id=target_user_id, + top_k=getattr(self.config, "cot_top_k", 3), + llm=self.chat_llm, + ) + + # Step 5: Generate enhanced response using sub-answers + logger.info("🔍 [CoT] Generating enhanced response...") + enhanced_response = self._generate_enhanced_response_with_context( + original_query=query, + sub_questions=sub_questions, + sub_answers=sub_answers, + chat_history=chat_history, + user_id=target_user_id, + search_engine=search_engine, + ) + + # Step 6: Update chat history (same as core method) + chat_history.chat_history.append({"role": "user", "content": query}) + chat_history.chat_history.append({"role": "assistant", "content": enhanced_response}) + self.chat_history_manager[target_user_id] = chat_history + + # Step 7: Submit message to scheduler (same as core method) + if len(accessible_cubes) == 1: + mem_cube_id = accessible_cubes[0].cube_id + mem_cube = self.mem_cubes[mem_cube_id] + if self.enable_mem_scheduler and self.mem_scheduler is not None: + from datetime import datetime + + from memos.mem_scheduler.modules.schemas import ( + ANSWER_LABEL, + ScheduleMessageItem, + ) + + message_item = ScheduleMessageItem( + user_id=target_user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + label=ANSWER_LABEL, + content=enhanced_response, + timestamp=datetime.now(), + ) + self.mem_scheduler.submit_messages(messages=[message_item]) + + return enhanced_response + + except Exception as e: + logger.error(f"🔍 [CoT] Error in CoT enhancement: {e}") + logger.info("🔍 [CoT] Falling back to standard chat") + return super().chat(query, user_id) + + def _get_search_engine_for_cot_with_validation( + self, user_cube_ids: list[str] + ) -> BaseTextMemory | None: + """ + Get the best available search engine for CoT operations with proper validation. + + Args: + user_cube_ids (list[str]): List of cube IDs the user has access to. + + Returns: + BaseTextMemory or None: The search engine to use for CoT. + """ + if not self.mem_cubes: + return None + + # Get the first available text memory from user's accessible cubes + for mem_cube_id, mem_cube in self.mem_cubes.items(): + if mem_cube_id not in user_cube_ids: + continue + if mem_cube.text_mem: + return mem_cube.text_mem + + return None + + def _generate_enhanced_response_with_context( + self, + original_query: str, + sub_questions: list[str], + sub_answers: list[str], + chat_history: Any, + user_id: str | None = None, + search_engine: BaseTextMemory | None = None, + ) -> str: + """ + Generate an enhanced response using sub-questions and their answers, with chat context. + + Args: + original_query (str): The original user query. + sub_questions (list[str]): List of sub-questions. + sub_answers (list[str]): List of answers to sub-questions. + chat_history: The user's chat history. + user_id (str, optional): User ID for context. + + Returns: + str: The enhanced response. + """ + # Build the synthesis prompt + qa_text = "" + for i, (question, answer) in enumerate(zip(sub_questions, sub_answers, strict=False), 1): + qa_text += f"Q{i}: {question}\nA{i}: {answer}\n\n" + + # Build messages with chat history context (similar to core method) + if (search_engine is not None) and self.config.enable_textual_memory: + if self.enable_cot: + search_memories = search_engine.search( + original_query, top_k=self.config.top_k, mode="fine" + ) + else: + search_memories = search_engine.search( + original_query, top_k=self.config.top_k, mode="fast" + ) + system_prompt = self._build_system_prompt( + search_memories + ) # Use the same system prompt builder + else: + system_prompt = self._build_system_prompt() + current_messages = [ + {"role": "system", "content": system_prompt + SYNTHESIS_PROMPT.format(qa_text=qa_text)}, + *chat_history.chat_history, + { + "role": "user", + "content": original_query, + }, + ] + + # Handle activation memory if enabled (same as core method) + past_key_values = None + if self.config.enable_activation_memory: + assert self.config.chat_model.backend == "huggingface", ( + "Activation memory only used for huggingface backend." + ) + # Get accessible cubes for the user + target_user_id = user_id if user_id is not None else self.user_id + accessible_cubes = self.user_manager.get_user_cubes(target_user_id) + user_cube_ids = [cube.cube_id for cube in accessible_cubes] + + for mem_cube_id, mem_cube in self.mem_cubes.items(): + if mem_cube_id not in user_cube_ids: + continue + if mem_cube.act_mem: + kv_cache = next(iter(mem_cube.act_mem.get_all()), None) + past_key_values = ( + kv_cache.memory if (kv_cache and hasattr(kv_cache, "memory")) else None + ) + break + + try: + # Generate the enhanced response using the chat LLM with same parameters as core + if past_key_values is not None: + enhanced_response = self.chat_llm.generate( + current_messages, past_key_values=past_key_values + ) + else: + enhanced_response = self.chat_llm.generate(current_messages) + + logger.info("🔍 [CoT] Generated enhanced response") + return enhanced_response + except Exception as e: + logger.error(f"🔍 [CoT] Error generating enhanced response: {e}") + # Fallback to standard chat + return super().chat(original_query, user_id) + + @classmethod + def cot_decompose( + cls, query: str, llm_config: Any, user_id: str | None = None, llm: LLMFactory | None = None + ) -> list[str] | dict[str, Any]: + """ + Decompose a complex query into sub-questions using Chain of Thought reasoning. + + Args: + query (str): The complex query to decompose + llm_config: LLM configuration for decomposition + user_id (str, optional): User ID for context + + Returns: + Union[List[str], Dict[str, Any]]: List of decomposed sub-questions or dict with complexity analysis + """ + # Create a temporary LLM instance for decomposition + if llm is None: + llm = LLMFactory.from_config(llm_config) + + # System prompt for CoT decomposition with complexity analysis + system_prompt = COT_DECOMPOSE_PROMPT.format(query=query) + + messages = [{"role": "system", "content": system_prompt}] + + try: + response = llm.generate(messages) + # Try to parse JSON response + result = json.loads(response) + return result + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse JSON response from LLM: {e}") + logger.warning(f"Raw response: {response}") + + # Try to extract JSON-like content from the response + try: + # Look for JSON-like content between curly braces + import re + + json_match = re.search(r"\{.*\}", response, re.DOTALL) + if json_match: + json_str = json_match.group(0) + result = json.loads(json_str) + return result + except Exception: + pass + + # If all parsing attempts fail, return default + return {"is_complex": False, "sub_questions": []} + except Exception as e: + logger.error(f"Unexpected error in cot_decompose: {e}") + return {"is_complex": False, "sub_questions": []} + + @classmethod + def get_sub_answers( + cls, + sub_questions: list[str] | dict[str, Any], + search_results: dict[str, Any] | None = None, + search_engine: BaseTextMemory | None = None, + llm_config: LLMFactory | None = None, + user_id: str | None = None, + top_k: int = 5, + llm: LLMFactory | None = None, + ) -> tuple[list[str], list[str]]: + """ + Get answers for sub-questions using either search results or a search engine. + + Args: + sub_questions (Union[List[str], Dict[str, Any]]): List of sub-questions from cot_decompose or dict with analysis + search_results (Dict[str, Any], optional): Search results containing relevant information + search_engine (BaseTextMemory, optional): Text memory engine for searching + llm_config (Any, optional): LLM configuration for processing (required if search_engine is provided) + user_id (str, optional): User ID for context + top_k (int): Number of top results to retrieve from search engine + + Returns: + Tuple[List[str], List[str]]: (sub_questions, sub_answers) + """ + # Extract sub-questions from decomposition result if needed + if isinstance(sub_questions, dict): + if not sub_questions.get("is_complex", False): + return [], [] + sub_questions = sub_questions.get("sub_questions", []) + + if not sub_questions: + return [], [] + + # Validate inputs + if search_results is None and search_engine is None: + raise ValueError("Either search_results or search_engine must be provided") + if llm is None: + llm = LLMFactory.from_config(llm_config) + + # Step 1: Get search results if search_engine is provided + if search_engine is not None: + search_results = cls._search_with_engine(sub_questions, search_engine, top_k) + + # Step 2: Generate answers for each sub-question using LLM in parallel + def generate_answer_for_question(question_index: int, sub_question: str) -> tuple[int, str]: + """Generate answer for a single sub-question.""" + # Extract relevant information from search results + relevant_info = [] + if search_results and search_results.get("text_mem"): + for cube_result in search_results["text_mem"]: + for memory in cube_result.get("memories", []): + relevant_info.append(memory.memory) + + # Build system prompt with memories (similar to MOSCore._build_system_prompt) + base_prompt = ( + "You are a knowledgeable and helpful AI assistant. " + "You have access to relevant information that helps you provide accurate answers. " + "Use the provided information to answer the question comprehensively. " + "If the information is not sufficient, acknowledge the limitations." + ) + + # Add memory context if available + if relevant_info: + memory_context = "\n\n## Relevant Information:\n" + for j, info in enumerate(relevant_info[:top_k], 1): # Take top 3 most relevant + memory_context += f"{j}. {info}\n" + system_prompt = base_prompt + memory_context + else: + system_prompt = ( + base_prompt + + "\n\n## Relevant Information:\nNo specific information found in memory." + ) + + # Create messages for LLM + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": sub_question}, + ] + + try: + # Generate answer using LLM + response = llm.generate(messages) + return question_index, response + except Exception as e: + logger.error(f"Failed to generate answer for sub-question '{sub_question}': {e}") + return question_index, f"Unable to generate answer for: {sub_question}" + + # Generate answers in parallel while maintaining order + sub_answers = [None] * len(sub_questions) + with concurrent.futures.ThreadPoolExecutor( + max_workers=min(len(sub_questions), 10) + ) as executor: + # Submit all answer generation tasks + future_to_index = { + executor.submit(generate_answer_for_question, i, question): i + for i, question in enumerate(sub_questions) + } + + # Collect results as they complete, but store them in the correct position + for future in concurrent.futures.as_completed(future_to_index): + try: + question_index, answer = future.result() + sub_answers[question_index] = answer + except Exception as e: + question_index = future_to_index[future] + logger.error( + f"Exception occurred while generating answer for question at index {question_index}: {e}" + ) + sub_answers[question_index] = ( + f"Error generating answer for question {question_index + 1}" + ) + + return sub_questions, sub_answers + + @classmethod + def _search_with_engine( + cls, sub_questions: list[str], search_engine: BaseTextMemory, top_k: int + ) -> dict[str, Any]: + """ + Search for sub-questions using the provided search engine in parallel. + + Args: + sub_questions (List[str]): List of sub-questions to search for + search_engine (BaseTextMemory): Text memory engine for searching + top_k (int): Number of top results to retrieve + + Returns: + Dict[str, Any]: Search results in the expected format + """ + + def search_single_question(question: str) -> list[Any]: + """Search for a single question using the search engine.""" + try: + # Handle different search method signatures + if hasattr(search_engine, "search"): + # Try different parameter combinations based on the engine type + try: + # For tree_text memory + return search_engine.search(question, top_k, mode="fast") + except TypeError: + try: + # For general_text memory + return search_engine.search(question, top_k) + except TypeError: + # For naive_text memory + return search_engine.search(question, top_k) + else: + return [] + except Exception as e: + logger.error(f"Search failed for question '{question}': {e}") + return [] + + # Search in parallel while maintaining order + all_memories = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=min(len(sub_questions), 10) + ) as executor: + # Submit all search tasks and keep track of their order + future_to_index = { + executor.submit(search_single_question, question): i + for i, question in enumerate(sub_questions) + } + + # Initialize results list with None values to maintain order + results = [None] * len(sub_questions) + + # Collect results as they complete, but store them in the correct position + for future in concurrent.futures.as_completed(future_to_index): + index = future_to_index[future] + try: + memories = future.result() + results[index] = memories + except Exception as e: + logger.error( + f"Exception occurred while searching for question at index {index}: {e}" + ) + results[index] = [] + + # Combine all results in the correct order + for result in results: + if result is not None: + all_memories.extend(result) + + # Format results in the expected structure + return {"text_mem": [{"cube_id": "search_engine", "memories": all_memories}]} diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index de830380..84f1a1c1 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -15,6 +15,9 @@ from memos.memories.textual.base import BaseTextMemory from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager +from memos.memories.textual.tree_text_memory.retrieve.internet_retriever_factory import ( + InternetRetrieverFactory, +) from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher from memos.types import MessageList @@ -34,6 +37,18 @@ def __init__(self, config: TreeTextMemoryConfig): self.graph_store: Neo4jGraphDB = GraphStoreFactory.from_config(config.graph_db) self.memory_manager: MemoryManager = MemoryManager(self.graph_store, self.embedder) + # Create internet retriever if configured + self.internet_retriever = None + if config.internet_retriever is not None: + self.internet_retriever = InternetRetrieverFactory.from_config( + config.internet_retriever, self.embedder + ) + logger.info( + f"Internet retriever initialized with backend: {config.internet_retriever.backend}" + ) + else: + logger.info("No internet retriever configured") + def add(self, memories: list[TextualMemoryItem | dict[str, Any]]) -> None: """Add memories. Args: @@ -66,7 +81,13 @@ def get_current_memory_size(self) -> dict[str, int]: return self.memory_manager.get_current_memory_size() def search( - self, query: str, top_k: int, info=None, mode: str = "fast", memory_type: str = "All" + self, + query: str, + top_k: int, + info=None, + mode: str = "fast", + memory_type: str = "All", + manual_close_internet: bool = False, ) -> list[TextualMemoryItem]: """Search for memories based on a query. User query -> TaskGoalParser -> MemoryPathResolver -> @@ -80,10 +101,21 @@ def search( - 'fine': Uses a more detailed search process, invoking large models for higher precision, but slower performance. memory_type (str): Type restriction for search. ['All', 'WorkingMemory', 'LongTermMemory', 'UserMemory'] + manual_close_internet (bool): If True, the internet retriever will be closed by this search, it high priority than config. Returns: list[TextualMemoryItem]: List of matching memories. """ - searcher = Searcher(self.dispatcher_llm, self.graph_store, self.embedder) + if (self.internet_retriever is not None) and manual_close_internet: + logger.warning( + "Internet retriever is init by config , but this search set manual_close_internet is True and will close it" + ) + self.internet_retriever = None + searcher = Searcher( + self.dispatcher_llm, + self.graph_store, + self.embedder, + internet_retriever=self.internet_retriever, + ) return searcher.search(query, top_k, info, mode, memory_type) def get_relevant_subgraph( diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/internet_retriever.py b/src/memos/memories/textual/tree_text_memory/retrieve/internet_retriever.py new file mode 100644 index 00000000..de4f3646 --- /dev/null +++ b/src/memos/memories/textual/tree_text_memory/retrieve/internet_retriever.py @@ -0,0 +1,263 @@ +"""Internet retrieval module for tree text memory.""" + +import uuid + +from datetime import datetime + +import requests + +from memos.embedders.factory import OllamaEmbedder +from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata + + +class GoogleCustomSearchAPI: + """Google Custom Search API Client""" + + def __init__( + self, api_key: str, search_engine_id: str, max_results: int = 20, num_per_request: int = 10 + ): + """ + Initialize Google Custom Search API client + + Args: + api_key: Google API key + search_engine_id: Search engine ID (cx parameter) + max_results: Maximum number of results to retrieve + num_per_request: Number of results per API request + """ + self.api_key = api_key + self.search_engine_id = search_engine_id + self.max_results = max_results + self.num_per_request = min(num_per_request, 10) # Google API limits to 10 + self.base_url = "https://www.googleapis.com/customsearch/v1" + + def search(self, query: str, num_results: int | None = None, start_index: int = 1) -> dict: + """ + Execute search request + + Args: + query: Search query + num_results: Number of results to return (uses config default if None) + start_index: Starting index (default 1) + + Returns: + Dictionary containing search results + """ + if num_results is None: + num_results = self.num_per_request + + params = { + "key": self.api_key, + "cx": self.search_engine_id, + "q": query, + "num": min(num_results, self.num_per_request), + "start": start_index, + } + + try: + response = requests.get(self.base_url, params=params) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Google search request failed: {e}") + return {} + + def get_all_results(self, query: str, max_results: int | None = None) -> list[dict]: + """ + Get all search results (with pagination) + + Args: + query: Search query + max_results: Maximum number of results (uses config default if None) + + Returns: + List of all search results + """ + if max_results is None: + max_results = self.max_results + + all_results = [] + start_index = 1 + + while len(all_results) < max_results: + search_data = self.search(query, start_index=start_index) + + if not search_data or "items" not in search_data: + break + + all_results.extend(search_data["items"]) + + # Check if there are more results + if len(search_data["items"]) < self.num_per_request: + break + + start_index += self.num_per_request + + # Avoid infinite loop + if start_index > 100: + break + + return all_results[:max_results] + + +class InternetGoogleRetriever: + """Internet retriever that converts search results to TextualMemoryItem format""" + + def __init__( + self, + api_key: str, + search_engine_id: str, + embedder: OllamaEmbedder, + max_results: int = 20, + num_per_request: int = 10, + ): + """ + Initialize internet retriever + + Args: + api_key: Google API key + search_engine_id: Search engine ID + embedder: Embedder instance for generating embeddings + max_results: Maximum number of results to retrieve + num_per_request: Number of results per API request + """ + self.google_api = GoogleCustomSearchAPI( + api_key, search_engine_id, max_results=max_results, num_per_request=num_per_request + ) + self.embedder = embedder + + def retrieve_from_internet( + self, query: str, top_k: int = 10, parsed_goal=None + ) -> list[TextualMemoryItem]: + """ + Retrieve information from the internet and convert to TextualMemoryItem format + + Args: + query: Search query + top_k: Number of results to return + parsed_goal: Parsed task goal (optional) + + Returns: + List of TextualMemoryItem + """ + # Get search results + search_results = self.google_api.get_all_results(query, max_results=top_k) + + # Convert to TextualMemoryItem format + memory_items = [] + + for _, result in enumerate(search_results): + # Extract basic information + title = result.get("title", "") + snippet = result.get("snippet", "") + link = result.get("link", "") + display_link = result.get("displayLink", "") + + # Combine memory content + memory_content = f"Title: {title}\nSummary: {snippet}\nSource: {link}" + # Create metadata + metadata = TreeNodeTextualMemoryMetadata( + user_id=None, + session_id=None, + status="activated", + type="fact", # Internet search results are usually factual information + memory_time=datetime.now().strftime("%Y-%m-%d"), + source="web", + confidence=85.0, # Confidence level for internet information + entities=self._extract_entities(title, snippet), + tags=self._extract_tags(title, snippet, parsed_goal), + visibility="public", + memory_type="LongTermMemory", # Internet search results as working memory + key=title, + sources=[link] if link else [], + embedding=self.embedder.embed([memory_content])[0], # Can add embedding later + created_at=datetime.now().isoformat(), + usage=[], + background=f"Internet search result from {display_link}", + ) + + # Create TextualMemoryItem + memory_item = TextualMemoryItem( + id=str(uuid.uuid4()), memory=memory_content, metadata=metadata + ) + + memory_items.append(memory_item) + + return memory_items + + def _extract_entities(self, title: str, snippet: str) -> list[str]: + """ + Extract entities from title and snippet + + Args: + title: Title + snippet: Snippet + + Returns: + List of entities + """ + # Simple entity extraction logic, can be improved as needed + text = f"{title} {snippet}" + entities = [] + + # Extract possible organization names (with common suffixes) + org_suffixes = ["Inc", "Corp", "LLC", "Ltd", "Company", "University", "Institute"] + words = text.split() + for i, word in enumerate(words): + if word in org_suffixes and i > 0: + entities.append(f"{words[i - 1]} {word}") + + # Extract possible dates + import re + + date_pattern = r"\d{4}-\d{2}-\d{2}|\d{1,2}/\d{1,2}/\d{4}|\w+ \d{1,2}, \d{4}" + dates = re.findall(date_pattern, text) + entities.extend(dates) + + return entities[:5] # Limit number of entities + + def _extract_tags(self, title: str, snippet: str, parsed_goal=None) -> list[str]: + """ + Extract tags from title and snippet + + Args: + title: Title + snippet: Snippet + parsed_goal: Parsed task goal + + Returns: + List of tags + """ + tags = [] + + # Extract tags from parsed goal + if parsed_goal: + if hasattr(parsed_goal, "topic") and parsed_goal.topic: + tags.append(parsed_goal.topic) + if hasattr(parsed_goal, "concept") and parsed_goal.concept: + tags.append(parsed_goal.concept) + + # Extract keywords from text + text = f"{title} {snippet}".lower() + + # Simple keyword extraction + keywords = [ + "news", + "report", + "article", + "study", + "research", + "analysis", + "update", + "announcement", + "policy", + "memo", + "document", + ] + + for keyword in keywords: + if keyword in text: + tags.append(keyword) + + # Remove duplicates and limit count + return list(set(tags))[:10] diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/internet_retriever_factory.py b/src/memos/memories/textual/tree_text_memory/retrieve/internet_retriever_factory.py new file mode 100644 index 00000000..d6af5944 --- /dev/null +++ b/src/memos/memories/textual/tree_text_memory/retrieve/internet_retriever_factory.py @@ -0,0 +1,89 @@ +"""Factory for creating internet retrievers.""" + +from typing import Any, ClassVar + +from memos.configs.internet_retriever import InternetRetrieverConfigFactory +from memos.embedders.base import BaseEmbedder +from memos.memories.textual.tree_text_memory.retrieve.internet_retriever import ( + InternetGoogleRetriever, +) +from memos.memories.textual.tree_text_memory.retrieve.xinyusearch import XinyuSearchRetriever + + +class InternetRetrieverFactory: + """Factory class for creating internet retriever instances.""" + + backend_to_class: ClassVar[dict[str, Any]] = { + "google": InternetGoogleRetriever, + "bing": InternetGoogleRetriever, # TODO: Implement BingRetriever + "xinyu": XinyuSearchRetriever, + } + + @classmethod + def from_config( + cls, config_factory: InternetRetrieverConfigFactory, embedder: BaseEmbedder + ) -> InternetGoogleRetriever | None: + """ + Create internet retriever from configuration. + + Args: + config_factory: Internet retriever configuration + embedder: Embedder instance for generating embeddings + + Returns: + InternetRetriever instance or None if no configuration provided + """ + if config_factory.backend is None: + return None + + backend = config_factory.backend + if backend not in cls.backend_to_class: + raise ValueError(f"Invalid internet retriever backend: {backend}") + + retriever_class = cls.backend_to_class[backend] + config = config_factory.config + + # Create retriever with appropriate parameters + if backend == "google": + return retriever_class( + api_key=config.api_key, + search_engine_id=config.search_engine_id, + embedder=embedder, + max_results=config.max_results, + num_per_request=config.num_per_request, + ) + elif backend == "bing": + # TODO: Implement Bing retriever + return retriever_class( + api_key=config.api_key, + search_engine_id=None, # Bing doesn't use search_engine_id + embedder=embedder, + max_results=config.max_results, + num_per_request=config.num_per_request, + ) + elif backend == "xinyu": + return retriever_class( + access_key=config.api_key, # Use api_key as access_key for xinyu + search_engine_id=config.search_engine_id, + embedder=embedder, + max_results=config.max_results, + ) + else: + raise ValueError(f"Unsupported backend: {backend}") + + @classmethod + def create_google_retriever( + cls, api_key: str, search_engine_id: str, embedder: BaseEmbedder + ) -> InternetGoogleRetriever: + """ + Create Google Custom Search retriever. + + Args: + api_key: Google API key + search_engine_id: Google Custom Search Engine ID + embedder: Embedder instance + + Returns: + InternetRetriever instance + """ + return InternetGoogleRetriever(api_key, search_engine_id, embedder) diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/reasoner.py b/src/memos/memories/textual/tree_text_memory/retrieve/reasoner.py index a5790c36..35582c87 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/reasoner.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/reasoner.py @@ -34,10 +34,7 @@ def reason( """ prompt_template = Template(REASON_PROMPT) memory_detailed_str = "\n".join( - [ - f"[{m.id}] ({m.metadata.hierarchy_level}) {m.metadata.key}: {m.memory}" - for m in ranked_memories - ] + [f"[{m.id}] {m.metadata.key}: {m.memory}" for m in ranked_memories] ) prompt = prompt_template.substitute(task=query, detailed_memory_list=memory_detailed_str) diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py index 0c1509b6..9cb434b5 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py @@ -8,6 +8,7 @@ from memos.llms.factory import OllamaLLM, OpenAILLM from memos.memories.textual.item import SearchedTreeNodeTextualMemoryMetadata, TextualMemoryItem +from .internet_retriever_factory import InternetRetrieverFactory from .reasoner import MemoryReasoner from .recall import GraphMemoryRetriever from .reranker import MemoryReranker @@ -20,6 +21,7 @@ def __init__( dispatcher_llm: OpenAILLM | OllamaLLM, graph_store: Neo4jGraphDB, embedder: OllamaEmbedder, + internet_retriever: InternetRetrieverFactory | None = None, ): self.graph_store = graph_store self.embedder = embedder @@ -29,6 +31,9 @@ def __init__( self.reranker = MemoryReranker(dispatcher_llm, self.embedder) self.reasoner = MemoryReasoner(dispatcher_llm) + # Create internet retriever from config if provided + self.internet_retriever = internet_retriever + def search( self, query: str, top_k: int, info=None, mode: str = "fast", memory_type: str = "All" ) -> list[TextualMemoryItem]: @@ -50,7 +55,19 @@ def search( """ # Step 1: Parse task structure into topic, concept, and fact levels - parsed_goal = self.task_goal_parser.parse(query) + context = [] + if mode == "fine": + query_embedding = self.embedder.embed([query])[0] + related_node_ids = self.graph_store.search_by_embedding(query_embedding, top_k=top_k) + related_nodes = [ + self.graph_store.get_node(related_node["id"]) for related_node in related_node_ids + ] + + context = [related_node["memory"] for related_node in related_nodes] + context = list(set(context)) + + # Step 1a: Parse task structure into topic, concept, and fact levels + parsed_goal = self.task_goal_parser.parse(query, "\n".join(context)) if parsed_goal.memories: query_embedding = self.embedder.embed(list({query, *parsed_goal.memories})) @@ -114,14 +131,39 @@ def retrieve_ranked_long_term_and_user(): ) return ranked_memories - # Step 3: Parallel execution of both paths - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + # Step 2c: Internet retrieval (Path C) + def retrieve_from_internet(): + """ + Retrieve information from the internet using Google Custom Search API. + """ + if not self.internet_retriever: + return [] + if memory_type not in ["All"]: + return [] + internet_items = self.internet_retriever.retrieve_from_internet( + query=query, top_k=top_k, parsed_goal=parsed_goal + ) + + # Convert to the format expected by reranker + ranked_memories = self.reranker.rerank( + query=query, + query_embedding=query_embedding[0], + graph_results=internet_items, + top_k=top_k * 2, + parsed_goal=parsed_goal, + ) + return ranked_memories + + # Step 3: Parallel execution of all paths + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: future_working = executor.submit(retrieve_from_working_memory) future_hybrid = executor.submit(retrieve_ranked_long_term_and_user) + future_internet = executor.submit(retrieve_from_internet) working_results = future_working.result() hybrid_results = future_hybrid.result() - searched_res = working_results + hybrid_results + internet_results = future_internet.result() + searched_res = working_results + hybrid_results + internet_results # Deduplicate by item.memory, keep higher score deduped_result = {} diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/task_goal_parser.py b/src/memos/memories/textual/tree_text_memory/retrieve/task_goal_parser.py index 5af204f1..e14c94ad 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/task_goal_parser.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/task_goal_parser.py @@ -31,7 +31,7 @@ def parse(self, task_description: str, context: str = "") -> ParsedTaskGoal: elif self.mode == "fine": if not self.llm: raise ValueError("LLM not provided for slow mode.") - return self._parse_fine(task_description) + return self._parse_fine(task_description, context) else: raise ValueError(f"Unknown mode: {self.mode}") @@ -43,11 +43,11 @@ def _parse_fast(self, task_description: str, limit_num: int = 5) -> ParsedTaskGo memories=[task_description], keys=[task_description], tags=[], goal_type="default" ) - def _parse_fine(self, query: str) -> ParsedTaskGoal: + def _parse_fine(self, query: str, context: str = "") -> ParsedTaskGoal: """ Slow mode: LLM structured parse. """ - prompt = Template(TASK_PARSE_PROMPT).substitute(task=query.strip(), context="") + prompt = Template(TASK_PARSE_PROMPT).substitute(task=query.strip(), context=context) response = self.llm.generate(messages=[{"role": "user", "content": prompt}]) return self._parse_response(response) diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/xinyusearch.py b/src/memos/memories/textual/tree_text_memory/retrieve/xinyusearch.py new file mode 100644 index 00000000..b803dfa4 --- /dev/null +++ b/src/memos/memories/textual/tree_text_memory/retrieve/xinyusearch.py @@ -0,0 +1,335 @@ +"""Xinyu Search API retriever for tree text memory.""" + +import json +import uuid + +from datetime import datetime + +import requests + +from memos.embedders.factory import OllamaEmbedder +from memos.log import get_logger +from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata + + +logger = get_logger(__name__) + + +class XinyuSearchAPI: + """Xinyu Search API Client""" + + def __init__(self, access_key: str, search_engine_id: str, max_results: int = 20): + """ + Initialize Xinyu Search API client + + Args: + access_key: Xinyu API access key + max_results: Maximum number of results to retrieve + """ + self.access_key = access_key + self.max_results = max_results + + # API configuration + self.config = {"url": search_engine_id} + + self.headers = { + "User-Agent": "PostmanRuntime/7.39.0", + "Content-Type": "application/json", + "Accept": "*/*", + "Accept-Encoding": "gzip, deflate, br", + "Connection": "keep-alive", + "token": access_key, + } + + def query_detail(self, body: dict | None = None, detail: bool = True) -> list[dict]: + """ + Query Xinyu search API for detailed results + + Args: + body: Search parameters + detail: Whether to get detailed results + + Returns: + List of search results + """ + res = [] + try: + url = self.config["url"] + + params = json.dumps(body) + resp = requests.request("POST", url, headers=self.headers, data=params) + res = json.loads(resp.text)["results"] + + # If detail interface, return online part + if "search_type" in body: + res = res["online"] + + if not detail: + for res_i in res: + res_i["summary"] = "「SUMMARY」" + res_i.get("summary", "") + + except Exception: + import traceback + + logger.error(f"xinyu search error: {traceback.format_exc()}") + return res + + def search(self, query: str, max_results: int | None = None) -> list[dict]: + """ + Execute search request + + Args: + query: Search query + max_results: Maximum number of results to return + + Returns: + List of search results + """ + if max_results is None: + max_results = self.max_results + + body = { + "search_type": ["online"], + "online_search": { + "max_entries": max_results, + "cache_switch": False, + "baidu_field": {"switch": True, "mode": "relevance", "type": "page"}, + "bing_field": {"switch": False, "mode": "relevance", "type": "page_web"}, + "sogou_field": {"switch": False, "mode": "relevance", "type": "page"}, + }, + "request_id": "memos" + str(uuid.uuid4()), + "queries": query, + } + + return self.query_detail(body) + + +class XinyuSearchRetriever: + """Xinyu Search retriever that converts search results to TextualMemoryItem format""" + + def __init__( + self, + access_key: str, + search_engine_id: str, + embedder: OllamaEmbedder, + max_results: int = 20, + ): + """ + Initialize Xinyu search retriever + + Args: + access_key: Xinyu API access key + embedder: Embedder instance for generating embeddings + max_results: Maximum number of results to retrieve + """ + self.xinyu_api = XinyuSearchAPI(access_key, search_engine_id, max_results=max_results) + self.embedder = embedder + + def retrieve_from_internet( + self, query: str, top_k: int = 10, parsed_goal=None + ) -> list[TextualMemoryItem]: + """ + Retrieve information from Xinyu search and convert to TextualMemoryItem format + + Args: + query: Search query + top_k: Number of results to return + parsed_goal: Parsed task goal (optional) + + Returns: + List of TextualMemoryItem + """ + # Get search results + search_results = self.xinyu_api.search(query, max_results=top_k) + + # Convert to TextualMemoryItem format + memory_items = [] + + for _, result in enumerate(search_results): + # Extract basic information from Xinyu response format + title = result.get("title", "") + content = result.get("content", "") + summary = result.get("summary", "") + url = result.get("url", "") + publish_time = result.get("publish_time", "") + if publish_time: + try: + publish_time = datetime.strptime(publish_time, "%Y-%m-%d %H:%M:%S").strftime( + "%Y-%m-%d" + ) + except Exception as e: + logger.error(f"xinyu search error: {e}") + publish_time = datetime.now().strftime("%Y-%m-%d") + else: + publish_time = datetime.now().strftime("%Y-%m-%d") + source = result.get("source", "") + site = result.get("site", "") + if site: + site = site.split("|")[0] + + # Combine memory content + memory_content = ( + f"Title: {title}\nSummary: {summary}\nContent: {content[:200]}...\nSource: {url}" + ) + + # Create metadata + metadata = TreeNodeTextualMemoryMetadata( + user_id=None, + session_id=None, + status="activated", + type="fact", # Search results are usually factual information + memory_time=publish_time, + source="web", + confidence=85.0, # Confidence level for search information + entities=self._extract_entities(title, content, summary), + tags=self._extract_tags(title, content, summary, parsed_goal), + visibility="public", + memory_type="LongTermMemory", # Search results as working memory + key=title, + sources=[url] if url else [], + embedding=self.embedder.embed([memory_content])[0], + created_at=datetime.now().isoformat(), + usage=[], + background=f"Xinyu search result from {site or source}", + ) + # Create TextualMemoryItem + memory_item = TextualMemoryItem( + id=str(uuid.uuid4()), memory=memory_content, metadata=metadata + ) + + memory_items.append(memory_item) + + return memory_items + + def _extract_entities(self, title: str, content: str, summary: str) -> list[str]: + """ + Extract entities from title, content and summary + + Args: + title: Article title + content: Article content + summary: Article summary + + Returns: + List of extracted entities + """ + # Simple entity extraction - can be enhanced with NER + text = f"{title} {content} {summary}" + entities = [] + + # Extract potential entities (simple approach) + # This can be enhanced with proper NER models + words = text.split() + for word in words: + if len(word) > 2 and word[0].isupper(): + entities.append(word) + + return list(set(entities))[:10] # Limit to 10 entities + + def _extract_tags(self, title: str, content: str, summary: str, parsed_goal=None) -> list[str]: + """ + Extract tags from title, content and summary + + Args: + title: Article title + content: Article content + summary: Article summary + parsed_goal: Parsed task goal (optional) + + Returns: + List of extracted tags + """ + tags = [] + + # Add source-based tags + tags.append("xinyu_search") + tags.append("news") + + # Add content-based tags + text = f"{title} {content} {summary}".lower() + + # Simple keyword-based tagging + keywords = { + "economy": [ + "economy", + "GDP", + "growth", + "production", + "industry", + "investment", + "consumption", + "market", + "trade", + "finance", + ], + "politics": [ + "politics", + "government", + "policy", + "meeting", + "leader", + "election", + "parliament", + "ministry", + ], + "technology": [ + "technology", + "tech", + "innovation", + "digital", + "internet", + "AI", + "artificial intelligence", + "software", + "hardware", + ], + "sports": [ + "sports", + "game", + "athlete", + "olympic", + "championship", + "tournament", + "team", + "player", + ], + "culture": [ + "culture", + "education", + "art", + "history", + "literature", + "music", + "film", + "museum", + ], + "health": [ + "health", + "medical", + "pandemic", + "hospital", + "doctor", + "medicine", + "disease", + "treatment", + ], + "environment": [ + "environment", + "ecology", + "pollution", + "green", + "climate", + "sustainability", + "renewable", + ], + } + + for category, words in keywords.items(): + if any(word in text for word in words): + tags.append(category) + + # Add goal-based tags if available + if parsed_goal and hasattr(parsed_goal, "tags"): + tags.extend(parsed_goal.tags) + + return list(set(tags))[:15] # Limit to 15 tags diff --git a/src/memos/templates/mos_prompts.py b/src/memos/templates/mos_prompts.py new file mode 100644 index 00000000..7f41f7dd --- /dev/null +++ b/src/memos/templates/mos_prompts.py @@ -0,0 +1,63 @@ +COT_DECOMPOSE_PROMPT = """ +I am an 8-year-old student who needs help analyzing and breaking down complex questions. Your task is to help me understand whether a question is complex enough to be broken down into smaller parts. + +Requirements: +1. First, determine if the question is a decomposable problem. If it is a decomposable problem, set 'is_complex' to True. +2. If the question needs to be decomposed, break it down into 1-3 sub-questions. The number should be controlled by the model based on the complexity of the question. +3. For decomposable questions, break them down into sub-questions and put them in the 'sub_questions' list. Each sub-question should contain only one question content without any additional notes. +4. If the question is not a decomposable problem, set 'is_complex' to False and set 'sub_questions' to an empty list. +5. You must return ONLY a valid JSON object. Do not include any other text, explanations, or formatting. + +Here are some examples: + +Question: Who is the current head coach of the gymnastics team in the capital of the country that Lang Ping represents? +Answer: {{"is_complex": true, "sub_questions": ["Which country does Lang Ping represent in volleyball?", "What is the capital of this country?", "Who is the current head coach of the gymnastics team in this capital?"]}} + +Question: Which country's cultural heritage is the Great Wall? +Answer: {{"is_complex": false, "sub_questions": []}} + +Question: How did the trade relationship between Madagascar and China develop, and how does this relationship affect the market expansion of the essential oil industry on Nosy Be Island? +Answer: {{"is_complex": true, "sub_questions": ["How did the trade relationship between Madagascar and China develop?", "How does this trade relationship affect the market expansion of the essential oil industry on Nosy Be Island?"]}} + +Please analyze the following question and respond with ONLY a valid JSON object: +Question: {query} +Answer:""" + +PRO_MODE_WELCOME_MESSAGE = """ +============================================================ +🚀 MemOS PRO Mode Activated! +============================================================ +✅ Chain of Thought (CoT) enhancement is now enabled by default +✅ Complex queries will be automatically decomposed and enhanced + +🌐 To enable Internet search capabilities: + 1. Go to your cube's textual memory configuration + 2. Set the backend to 'google' in the internet_retriever section + 3. Configure the following parameters: + - api_key: Your Google Search API key + - cse_id: Your Custom Search Engine ID + - num_results: Number of search results (default: 5) + +📝 Example configuration at cube config for tree_text_memory : + internet_retriever: + backend: 'google' + config: + api_key: 'your_google_api_key_here' + cse_id: 'your_custom_search_engine_id' + num_results: 5 +details: https://github.com/memos-ai/memos/blob/main/examples/core_memories/tree_textual_w_internet_memoy.py +============================================================ +""" + +SYNTHESIS_PROMPT = """ +exclude memory information, synthesizing information from multiple sources to provide comprehensive answers. +I will give you chain of thought for sub-questions and their answers. +Sub-questions and their answers: +{qa_text} + +Please synthesize these answers into a comprehensive response that: +1. Addresses the original question completely +2. Integrates information from all sub-questions +3. Provides clear reasoning and connections +4. Is well-structured and easy to understand +5. Maintains a natural conversational tone"""