From 31e7ed79f59fef56c685702de07e7e6bbead7d7f Mon Sep 17 00:00:00 2001 From: chentang Date: Fri, 25 Jul 2025 17:47:23 +0800 Subject: [PATCH 1/5] rebase to address conflicts --- src/memos/mem_scheduler/base_scheduler.py | 40 +++++++++++++++---- .../general_modules/retriever.py | 20 ++-------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 1daf87b4..3efa27ad 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -139,7 +139,7 @@ def _set_current_context_from_message(self, msg: ScheduleMessageItem) -> None: self.current_mem_cube_id = msg.mem_cube_id self.current_mem_cube = msg.mem_cube - def transform_working_memories_to_monitors( + def transform_memories_to_monitors( self, query_keywords, memories: list[TextualMemoryItem] ) -> list[MemoryMonitorItem]: """ @@ -200,8 +200,9 @@ def replace_working_memory( text_mem_base: TreeTextMemory = text_mem_base # process rerank memories with llm - query_monitor = self.monitor.query_monitors[user_id][mem_cube_id] - query_history = query_monitor.get_queries_with_timesort() + query_history = self.monitor.query_monitors[user_id][ + mem_cube_id + ].get_queries_with_timesort() memories_with_new_order, rerank_success_flag = ( self.retriever.process_and_rerank_memories( queries=query_history, @@ -212,11 +213,13 @@ def replace_working_memory( ) # update working memory monitors - query_keywords = query_monitor.get_keywords_collections() + query_keywords = self.monitor.query_monitors[user_id][ + mem_cube_id + ].get_keywords_collections() logger.debug( f"Processing {len(memories_with_new_order)} memories with {len(query_keywords)} query keywords" ) - new_working_memory_monitors = self.transform_working_memories_to_monitors( + new_working_memory_monitors = self.transform_memories_to_monitors( query_keywords=query_keywords, memories=memories_with_new_order, ) @@ -256,6 +259,25 @@ def replace_working_memory( return memories_with_new_order + def initialize_working_memory_monitors( + self, + user_id: UserID | str, + mem_cube_id: MemCubeID | str, + mem_cube: GeneralMemCube, + ): + text_mem_base: TreeTextMemory = mem_cube.text_mem + working_memories = text_mem_base.get_working_memory() + + working_memory_monitors = self.transform_memories_to_monitors( + memories=working_memories, + ) + self.monitor.update_working_memory_monitors( + new_working_memory_monitors=working_memory_monitors, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + ) + def update_activation_memory( self, new_memories: list[str | TextualMemoryItem], @@ -359,9 +381,13 @@ def update_activation_memory_periodically( or len(self.monitor.working_memory_monitors[user_id][mem_cube_id].memories) == 0 ): logger.warning( - "No memories found in working_memory_monitors, activation memory update is skipped" + "No memories found in working_memory_monitors, initializing from current working_memories" + ) + self.initialize_working_memory_monitors( + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, ) - return self.monitor.update_activation_memory_monitors( user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube diff --git a/src/memos/mem_scheduler/general_modules/retriever.py b/src/memos/mem_scheduler/general_modules/retriever.py index 3732078d..562a6f8f 100644 --- a/src/memos/mem_scheduler/general_modules/retriever.py +++ b/src/memos/mem_scheduler/general_modules/retriever.py @@ -33,12 +33,7 @@ def __init__(self, process_llm: BaseLLM, config: BaseSchedulerConfig): self.process_llm = process_llm def search( - self, - query: str, - mem_cube: GeneralMemCube, - top_k: int, - method: str = TreeTextMemory_SEARCH_METHOD, - info: dict | None = None, + self, query: str, mem_cube: GeneralMemCube, top_k: int, method=TreeTextMemory_SEARCH_METHOD ) -> list[TextualMemoryItem]: """Search in text memory with the given query. @@ -52,21 +47,14 @@ def search( """ text_mem_base = mem_cube.text_mem try: - if method in [TreeTextMemory_SEARCH_METHOD, TreeTextMemory_FINE_SEARCH_METHOD]: + if method == [TreeTextMemory_SEARCH_METHOD, TreeTextMemory_FINE_SEARCH_METHOD]: assert isinstance(text_mem_base, TreeTextMemory) - if info is None: - logger.warning( - "Please input 'info' when use tree.search so that " - "the database would store the consume history." - ) - info = {"user_id": "", "session_id": ""} - mode = "fast" if method == TreeTextMemory_SEARCH_METHOD else "fine" results_long_term = text_mem_base.search( - query=query, top_k=top_k, memory_type="LongTermMemory", mode=mode, info=info + query=query, top_k=top_k, memory_type="LongTermMemory", mode=mode ) results_user = text_mem_base.search( - query=query, top_k=top_k, memory_type="UserMemory", mode=mode, info=info + query=query, top_k=top_k, memory_type="UserMemory", mode=mode ) results = results_long_term + results_user else: From a13feec84f0becf9bba3cc75e43646c5fac6bd2b Mon Sep 17 00:00:00 2001 From: chentang Date: Tue, 29 Jul 2025 20:43:09 +0800 Subject: [PATCH 2/5] fix bugs: fix a bug in retriever, and add new auth info for neo4j db --- src/memos/mem_scheduler/general_modules/retriever.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/general_modules/retriever.py b/src/memos/mem_scheduler/general_modules/retriever.py index 562a6f8f..4a746334 100644 --- a/src/memos/mem_scheduler/general_modules/retriever.py +++ b/src/memos/mem_scheduler/general_modules/retriever.py @@ -47,7 +47,7 @@ def search( """ text_mem_base = mem_cube.text_mem try: - if method == [TreeTextMemory_SEARCH_METHOD, TreeTextMemory_FINE_SEARCH_METHOD]: + if method in [TreeTextMemory_SEARCH_METHOD, TreeTextMemory_FINE_SEARCH_METHOD]: assert isinstance(text_mem_base, TreeTextMemory) mode = "fast" if method == TreeTextMemory_SEARCH_METHOD else "fine" results_long_term = text_mem_base.search( From a9fb924df451e8492acc0beae283c8acf9ad3605 Mon Sep 17 00:00:00 2001 From: chentang Date: Thu, 31 Jul 2025 20:05:11 +0800 Subject: [PATCH 3/5] fix bugs & new feat: fix bugs in mem_scheduler examples, and remove initialize working memories (logically uneccessary). change the function parameters of search as the function input info as an addition --- src/memos/mem_scheduler/base_scheduler.py | 40 ++++--------------- .../general_modules/retriever.py | 18 +++++++-- 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 3efa27ad..1daf87b4 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -139,7 +139,7 @@ def _set_current_context_from_message(self, msg: ScheduleMessageItem) -> None: self.current_mem_cube_id = msg.mem_cube_id self.current_mem_cube = msg.mem_cube - def transform_memories_to_monitors( + def transform_working_memories_to_monitors( self, query_keywords, memories: list[TextualMemoryItem] ) -> list[MemoryMonitorItem]: """ @@ -200,9 +200,8 @@ def replace_working_memory( text_mem_base: TreeTextMemory = text_mem_base # process rerank memories with llm - query_history = self.monitor.query_monitors[user_id][ - mem_cube_id - ].get_queries_with_timesort() + query_monitor = self.monitor.query_monitors[user_id][mem_cube_id] + query_history = query_monitor.get_queries_with_timesort() memories_with_new_order, rerank_success_flag = ( self.retriever.process_and_rerank_memories( queries=query_history, @@ -213,13 +212,11 @@ def replace_working_memory( ) # update working memory monitors - query_keywords = self.monitor.query_monitors[user_id][ - mem_cube_id - ].get_keywords_collections() + query_keywords = query_monitor.get_keywords_collections() logger.debug( f"Processing {len(memories_with_new_order)} memories with {len(query_keywords)} query keywords" ) - new_working_memory_monitors = self.transform_memories_to_monitors( + new_working_memory_monitors = self.transform_working_memories_to_monitors( query_keywords=query_keywords, memories=memories_with_new_order, ) @@ -259,25 +256,6 @@ def replace_working_memory( return memories_with_new_order - def initialize_working_memory_monitors( - self, - user_id: UserID | str, - mem_cube_id: MemCubeID | str, - mem_cube: GeneralMemCube, - ): - text_mem_base: TreeTextMemory = mem_cube.text_mem - working_memories = text_mem_base.get_working_memory() - - working_memory_monitors = self.transform_memories_to_monitors( - memories=working_memories, - ) - self.monitor.update_working_memory_monitors( - new_working_memory_monitors=working_memory_monitors, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, - ) - def update_activation_memory( self, new_memories: list[str | TextualMemoryItem], @@ -381,13 +359,9 @@ def update_activation_memory_periodically( or len(self.monitor.working_memory_monitors[user_id][mem_cube_id].memories) == 0 ): logger.warning( - "No memories found in working_memory_monitors, initializing from current working_memories" - ) - self.initialize_working_memory_monitors( - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, + "No memories found in working_memory_monitors, activation memory update is skipped" ) + return self.monitor.update_activation_memory_monitors( user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube diff --git a/src/memos/mem_scheduler/general_modules/retriever.py b/src/memos/mem_scheduler/general_modules/retriever.py index 4a746334..3732078d 100644 --- a/src/memos/mem_scheduler/general_modules/retriever.py +++ b/src/memos/mem_scheduler/general_modules/retriever.py @@ -33,7 +33,12 @@ def __init__(self, process_llm: BaseLLM, config: BaseSchedulerConfig): self.process_llm = process_llm def search( - self, query: str, mem_cube: GeneralMemCube, top_k: int, method=TreeTextMemory_SEARCH_METHOD + self, + query: str, + mem_cube: GeneralMemCube, + top_k: int, + method: str = TreeTextMemory_SEARCH_METHOD, + info: dict | None = None, ) -> list[TextualMemoryItem]: """Search in text memory with the given query. @@ -49,12 +54,19 @@ def search( try: if method in [TreeTextMemory_SEARCH_METHOD, TreeTextMemory_FINE_SEARCH_METHOD]: assert isinstance(text_mem_base, TreeTextMemory) + if info is None: + logger.warning( + "Please input 'info' when use tree.search so that " + "the database would store the consume history." + ) + info = {"user_id": "", "session_id": ""} + mode = "fast" if method == TreeTextMemory_SEARCH_METHOD else "fine" results_long_term = text_mem_base.search( - query=query, top_k=top_k, memory_type="LongTermMemory", mode=mode + query=query, top_k=top_k, memory_type="LongTermMemory", mode=mode, info=info ) results_user = text_mem_base.search( - query=query, top_k=top_k, memory_type="UserMemory", mode=mode + query=query, top_k=top_k, memory_type="UserMemory", mode=mode, info=info ) results = results_long_term + results_user else: From 415fafdeadd71dea9bb6c6878024255309654c36 Mon Sep 17 00:00:00 2001 From: chentang Date: Wed, 6 Aug 2025 19:30:01 +0800 Subject: [PATCH 4/5] fix bugs: modify configs, examples, schedule handlers of mem_scheduler, and rename some variables --- evaluation/scripts/run_locomo_eval.sh | 12 ++--- .../general_scheduler_config.yaml | 4 +- .../config/mem_scheduler/mem_chat_config.yaml | 15 ------ .../memos_config_w_scheduler.yaml | 4 +- .../memos_config_w_scheduler_and_openai.yaml | 5 +- .../memos_config_wo_scheduler.yaml | 37 ------------- examples/mem_os/chat_w_scheduler.py | 54 ++++++++++++------- examples/mem_scheduler/memos_w_scheduler.py | 2 +- .../memos_w_scheduler_for_test.py | 2 +- examples/mem_scheduler/rabbitmq_example.py | 2 +- .../mem_scheduler/try_schedule_modules.py | 2 +- src/memos/api/config.py | 2 +- src/memos/configs/mem_scheduler.py | 51 ++++++++++++------ src/memos/mem_os/main.py | 2 +- src/memos/mem_os/product.py | 2 +- src/memos/mem_os/utils/default_config.py | 2 +- src/memos/mem_scheduler/base_scheduler.py | 35 ++++++++---- .../general_modules/rabbitmq_service.py | 4 +- .../general_modules/scheduler_logger.py | 12 +++-- src/memos/mem_scheduler/general_scheduler.py | 52 +++++++++++------- .../monitors/dispatcher_monitor.py | 27 ++++++++-- .../mem_scheduler/monitors/general_monitor.py | 8 ++- .../mem_scheduler/schemas/general_schemas.py | 4 +- .../mem_scheduler/schemas/message_schemas.py | 1 + src/memos/mem_scheduler/utils/misc_utils.py | 45 +++++++++++++++- src/memos/memories/activation/item.py | 2 +- src/memos/templates/mem_scheduler_prompts.py | 16 ++++-- 27 files changed, 250 insertions(+), 154 deletions(-) delete mode 100644 examples/data/config/mem_scheduler/mem_chat_config.yaml delete mode 100644 examples/data/config/mem_scheduler/memos_config_wo_scheduler.yaml diff --git a/evaluation/scripts/run_locomo_eval.sh b/evaluation/scripts/run_locomo_eval.sh index 89a09729..d9c13a1a 100755 --- a/evaluation/scripts/run_locomo_eval.sh +++ b/evaluation/scripts/run_locomo_eval.sh @@ -6,12 +6,12 @@ VERSION="072001" WORKERS=10 TOPK=20 -# echo "Running locomo_ingestion.py..." -# CUDA_VISIBLE_DEVICES=0 python scripts/locomo/locomo_ingestion.py --lib $LIB --version $VERSION --workers $WORKERS -# if [ $? -ne 0 ]; then -# echo "Error running locomo_ingestion.py" -# exit 1 -# fi + echo "Running locomo_ingestion.py..." + CUDA_VISIBLE_DEVICES=0 python scripts/locomo/locomo_ingestion.py --lib $LIB --version $VERSION --workers $WORKERS + if [ $? -ne 0 ]; then + echo "Error running locomo_ingestion.py" + exit 1 + fi echo "Running locomo_search.py..." CUDA_VISIBLE_DEVICES=0 python scripts/locomo/locomo_search.py --lib $LIB --version $VERSION --top_k $TOPK --workers $WORKERS diff --git a/examples/data/config/mem_scheduler/general_scheduler_config.yaml b/examples/data/config/mem_scheduler/general_scheduler_config.yaml index 39065590..2360bb14 100644 --- a/examples/data/config/mem_scheduler/general_scheduler_config.yaml +++ b/examples/data/config/mem_scheduler/general_scheduler_config.yaml @@ -1,9 +1,11 @@ backend: general_scheduler config: top_k: 10 - top_n: 10 act_mem_update_interval: 30 context_window_size: 10 thread_pool_max_workers: 5 consume_interval_seconds: 1 + working_mem_monitor_capacity: 20 + activation_mem_monitor_capacity: 5 enable_parallel_dispatch: true + enable_activation_memory: true diff --git a/examples/data/config/mem_scheduler/mem_chat_config.yaml b/examples/data/config/mem_scheduler/mem_chat_config.yaml deleted file mode 100644 index 6c98570e..00000000 --- a/examples/data/config/mem_scheduler/mem_chat_config.yaml +++ /dev/null @@ -1,15 +0,0 @@ -backend: simple -config: - user_id: "user_123" - chat_llm: - backend: huggingface - config: - model_name_or_path: "Qwen/Qwen3-1.7B" - temperature: 0.1 - remove_think_prefix: true - max_tokens: 4096 - max_turns_window: 20 - top_k: 5 - enable_textual_memory: true - enable_activation_memory: false - enable_parametric_memory: false diff --git a/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml b/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml index e898c826..a851ba77 100644 --- a/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml +++ b/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml @@ -33,12 +33,14 @@ mem_scheduler: backend: "general_scheduler" config: top_k: 10 - top_n: 10 act_mem_update_interval: 30 context_window_size: 10 thread_pool_max_workers: 10 consume_interval_seconds: 1 + working_mem_monitor_capacity: 20 + activation_mem_monitor_capacity: 5 enable_parallel_dispatch: true + enable_activation_memory: true max_turns_window: 20 top_k: 5 enable_textual_memory: true diff --git a/examples/data/config/mem_scheduler/memos_config_w_scheduler_and_openai.yaml b/examples/data/config/mem_scheduler/memos_config_w_scheduler_and_openai.yaml index b329e0fc..b82dbd2b 100644 --- a/examples/data/config/mem_scheduler/memos_config_w_scheduler_and_openai.yaml +++ b/examples/data/config/mem_scheduler/memos_config_w_scheduler_and_openai.yaml @@ -35,13 +35,14 @@ mem_scheduler: backend: "general_scheduler" config: top_k: 10 - top_n: 10 act_mem_update_interval: 30 context_window_size: 10 thread_pool_max_workers: 10 consume_interval_seconds: 1 + working_mem_monitor_capacity: 20 + activation_mem_monitor_capacity: 5 enable_parallel_dispatch: true - enable_act_memory_update: false + enable_activation_memory: true max_turns_window: 20 top_k: 5 enable_textual_memory: true diff --git a/examples/data/config/mem_scheduler/memos_config_wo_scheduler.yaml b/examples/data/config/mem_scheduler/memos_config_wo_scheduler.yaml deleted file mode 100644 index b451fb44..00000000 --- a/examples/data/config/mem_scheduler/memos_config_wo_scheduler.yaml +++ /dev/null @@ -1,37 +0,0 @@ -user_id: "root" -chat_model: - backend: "huggingface" - config: - model_name_or_path: "Qwen/Qwen3-1.7B" - temperature: 0.1 - remove_think_prefix: true - max_tokens: 4096 -mem_reader: - backend: "simple_struct" - config: - llm: - backend: "ollama" - config: - model_name_or_path: "qwen3:0.6b" - remove_think_prefix: true - temperature: 0.8 - max_tokens: 1024 - top_p: 0.9 - top_k: 50 - embedder: - backend: "ollama" - config: - model_name_or_path: "nomic-embed-text:latest" - chunker: - backend: "sentence" - config: - tokenizer_or_token_counter: "gpt2" - chunk_size: 512 - chunk_overlap: 128 - min_sentences_per_chunk: 1 -max_turns_window: 20 -top_k: 5 -enable_textual_memory: true -enable_activation_memory: false -enable_parametric_memory: false -enable_mem_scheduler: false diff --git a/examples/mem_os/chat_w_scheduler.py b/examples/mem_os/chat_w_scheduler.py index c5d84efc..6810fe5e 100644 --- a/examples/mem_os/chat_w_scheduler.py +++ b/examples/mem_os/chat_w_scheduler.py @@ -1,40 +1,61 @@ import shutil -import uuid +import sys from pathlib import Path from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig +from memos.configs.mem_scheduler import AuthConfig from memos.mem_cube.general import GeneralMemCube from memos.mem_os.main import MOS -from memos.mem_scheduler.utils.misc_utils import parse_yaml -# init MOS -config = parse_yaml("./examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml") +FILE_PATH = Path(__file__).absolute() +BASE_DIR = FILE_PATH.parent.parent.parent +sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory -mos_config = MOSConfig(**config) -mos = MOS(mos_config) -# create user -user_id = str(uuid.uuid4()) -mos.create_user(user_id=user_id) +# set configs +mos_config = MOSConfig.from_yaml_file( + f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler_and_openai.yaml" +) -config = GeneralMemCubeConfig.from_yaml_file( - "./examples/data/config/mem_scheduler/mem_cube_config.yaml" +mem_cube_config = GeneralMemCubeConfig.from_yaml_file( + f"{BASE_DIR}/examples/data/config/mem_scheduler/mem_cube_config.yaml" ) + +# default local graphdb uri +if AuthConfig.default_config_exists(): + auth_config = AuthConfig.from_local_config() + + mos_config.mem_reader.config.llm.config.api_key = auth_config.openai.api_key + mos_config.mem_reader.config.llm.config.api_base = auth_config.openai.base_url + + mem_cube_config.text_mem.config.graph_db.config.uri = auth_config.graph_db.uri + mem_cube_config.text_mem.config.graph_db.config.user = auth_config.graph_db.user + mem_cube_config.text_mem.config.graph_db.config.password = auth_config.graph_db.password + mem_cube_config.text_mem.config.graph_db.config.db_name = auth_config.graph_db.db_name + mem_cube_config.text_mem.config.graph_db.config.auto_create = auth_config.graph_db.auto_create + +# Initialization +mos = MOS(mos_config) + +user_id = "user_1" +mos.create_user(user_id) + mem_cube_id = "mem_cube_5" -mem_cube_name_or_path = f"./outputs/mem_scheduler/{user_id}/{mem_cube_id}" +mem_cube_name_or_path = f"{BASE_DIR}/outputs/mem_scheduler/{user_id}/{mem_cube_id}" + if Path(mem_cube_name_or_path).exists(): shutil.rmtree(mem_cube_name_or_path) print(f"{mem_cube_name_or_path} is not empty, and has been removed.") -mem_cube = GeneralMemCube(config) +mem_cube = GeneralMemCube(mem_cube_config) mem_cube.dump(mem_cube_name_or_path) - mos.register_mem_cube( mem_cube_name_or_path=mem_cube_name_or_path, mem_cube_id=mem_cube_id, user_id=user_id ) + messages = [ {"role": "user", "content": "I like playing football."}, {"role": "assistant", "content": "I like playing football too."}, @@ -51,8 +72,3 @@ for node in retrieved_memories["text_mem"][0]["memories"]["nodes"]: if node["metadata"]["memory_type"] == "WorkingMemory": print(f"🤖 [Assistant]working mem : {node['memory']}\n") - if retrieved_memories["act_mem"][0]["memories"]: - for act_mem in retrieved_memories["act_mem"][0]["memories"]: - print(f"🤖 [Assistant]act_mem: {act_mem['memory']}\n") - else: - print("🤖 [Assistant]act_mem: None\n") diff --git a/examples/mem_scheduler/memos_w_scheduler.py b/examples/mem_scheduler/memos_w_scheduler.py index e18aa9d9..63054586 100644 --- a/examples/mem_scheduler/memos_w_scheduler.py +++ b/examples/mem_scheduler/memos_w_scheduler.py @@ -85,7 +85,7 @@ def run_with_scheduler_init(): # default local graphdb uri if AuthConfig.default_config_exists(): - auth_config = AuthConfig.from_local_yaml() + auth_config = AuthConfig.from_local_config() mos_config.mem_reader.config.llm.config.api_key = auth_config.openai.api_key mos_config.mem_reader.config.llm.config.api_base = auth_config.openai.base_url diff --git a/examples/mem_scheduler/memos_w_scheduler_for_test.py b/examples/mem_scheduler/memos_w_scheduler_for_test.py index 28f3c3b3..074400ee 100644 --- a/examples/mem_scheduler/memos_w_scheduler_for_test.py +++ b/examples/mem_scheduler/memos_w_scheduler_for_test.py @@ -97,7 +97,7 @@ def init_task(): # default local graphdb uri if AuthConfig.default_config_exists(): - auth_config = AuthConfig.from_local_yaml() + auth_config = AuthConfig.from_local_config() mos_config.mem_reader.config.llm.config.api_key = auth_config.openai.api_key mos_config.mem_reader.config.llm.config.api_base = auth_config.openai.base_url diff --git a/examples/mem_scheduler/rabbitmq_example.py b/examples/mem_scheduler/rabbitmq_example.py index 6b04111a..ba573238 100644 --- a/examples/mem_scheduler/rabbitmq_example.py +++ b/examples/mem_scheduler/rabbitmq_example.py @@ -21,7 +21,7 @@ def main(): print("Please set configs for rabbitmq.") return else: - rabbitmq_module.initialize_rabbitmq(config=AuthConfig.from_local_yaml().rabbitmq) + rabbitmq_module.initialize_rabbitmq(config=AuthConfig.from_local_config().rabbitmq) try: rabbitmq_module.wait_for_connection_ready() diff --git a/examples/mem_scheduler/try_schedule_modules.py b/examples/mem_scheduler/try_schedule_modules.py index 29b166bf..8c5d1415 100644 --- a/examples/mem_scheduler/try_schedule_modules.py +++ b/examples/mem_scheduler/try_schedule_modules.py @@ -145,7 +145,7 @@ def show_web_logs(mem_scheduler: GeneralScheduler): # default local graphdb uri if AuthConfig.default_config_exists(): - auth_config = AuthConfig.from_local_yaml() + auth_config = AuthConfig.from_local_config() mos_config.mem_reader.config.llm.config.api_key = auth_config.openai.api_key mos_config.mem_reader.config.llm.config.api_base = auth_config.openai.base_url diff --git a/src/memos/api/config.py b/src/memos/api/config.py index 382f5d48..e0ef62a4 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -266,7 +266,7 @@ def get_scheduler_config() -> dict[str, Any]: "MOS_SCHEDULER_ENABLE_PARALLEL_DISPATCH", "true" ).lower() == "true", - "enable_act_memory_update": True, + "enable_activation_memory": True, }, } diff --git a/src/memos/configs/mem_scheduler.py b/src/memos/configs/mem_scheduler.py index c1166a03..f031b95e 100644 --- a/src/memos/configs/mem_scheduler.py +++ b/src/memos/configs/mem_scheduler.py @@ -21,8 +21,6 @@ class BaseSchedulerConfig(BaseConfig): top_k: int = Field( default=10, description="Number of top candidates to consider in initial retrieval" ) - # TODO: The 'top_n' field is deprecated and will be removed in future versions. - top_n: int = Field(default=5, description="Number of final results to return after processing") enable_parallel_dispatch: bool = Field( default=True, description="Whether to enable parallel message processing using thread pool" ) @@ -55,9 +53,15 @@ class GeneralSchedulerConfig(BaseSchedulerConfig): default=DEFAULT_ACT_MEM_DUMP_PATH, # Replace with DEFAULT_ACT_MEM_DUMP_PATH description="File path for dumping activation memory", ) - enable_act_memory_update: bool = Field( + enable_activation_memory: bool = Field( default=False, description="Whether to enable automatic activation memory updates" ) + working_mem_monitor_capacity: int = Field( + default=30, description="Capacity of the working memory monitor" + ) + activation_mem_monitor_capacity: int = Field( + default=20, description="Capacity of the activation memory monitor" + ) class SchedulerConfigFactory(BaseConfig): @@ -137,29 +141,46 @@ class AuthConfig(BaseConfig, DictConversionMixin): ) @classmethod - def from_local_yaml(cls, config_path: str | None = None) -> "AuthConfig": + def from_local_config(cls, config_path: str | Path | None = None) -> "AuthConfig": """ - Load configuration from YAML file + Load configuration from either a YAML or JSON file based on file extension. + + Automatically detects file type (YAML or JSON) from the file extension + and uses the appropriate parser. If no path is provided, uses the default + configuration path (YAML) or its JSON counterpart. Args: - config_path: Path to YAML configuration file + config_path: Optional path to configuration file. + If not provided, uses default configuration path. Returns: - AuthConfig instance + AuthConfig instance populated with data from the configuration file. Raises: - FileNotFoundError: If config file doesn't exist - ValueError: If YAML parsing or validation fails + FileNotFoundError: If the specified or default configuration file does not exist. + ValueError: If file extension is not .yaml/.yml or .json, or if parsing fails. """ - + # Determine config path if config_path is None: config_path = cls.default_config_path - # Check file exists - if not Path(config_path).exists(): - raise FileNotFoundError(f"Config file not found: {config_path}") - - return cls.from_yaml_file(yaml_path=config_path) + # Validate file existence + config_path_obj = Path(config_path) + if not config_path_obj.exists(): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + # Get file extension and determine parser + file_ext = config_path_obj.suffix.lower() + + if file_ext in (".yaml", ".yml"): + return cls.from_yaml_file(yaml_path=str(config_path_obj)) + elif file_ext == ".json": + return cls.from_json_file(json_path=str(config_path_obj)) + else: + raise ValueError( + f"Unsupported file format: {file_ext}. " + "Please use YAML (.yaml, .yml) or JSON (.json) files." + ) def set_openai_config_to_environment(self): # Set environment variables diff --git a/src/memos/mem_os/main.py b/src/memos/mem_os/main.py index 2cead7c8..2520c8fd 100644 --- a/src/memos/mem_os/main.py +++ b/src/memos/mem_os/main.py @@ -219,7 +219,7 @@ def _chat_with_cot_enhancement( mem_cube=mem_cube, label=ANSWER_LABEL, content=enhanced_response, - timestamp=datetime.now(), + timestamp=datetime.now().isoformat(), ) self.mem_scheduler.submit_messages(messages=[message_item]) diff --git a/src/memos/mem_os/product.py b/src/memos/mem_os/product.py index 53895c5e..8224f08b 100644 --- a/src/memos/mem_os/product.py +++ b/src/memos/mem_os/product.py @@ -491,7 +491,7 @@ def _send_message_to_scheduler( mem_cube=self.mem_cubes[mem_cube_id], label=label, content=query, - timestamp=datetime.now(), + timestamp=datetime.now().isoformat(), ) self.mem_scheduler.submit_messages(messages=[message_item]) diff --git a/src/memos/mem_os/utils/default_config.py b/src/memos/mem_os/utils/default_config.py index e64bf236..967654d8 100644 --- a/src/memos/mem_os/utils/default_config.py +++ b/src/memos/mem_os/utils/default_config.py @@ -112,7 +112,7 @@ def get_default_config( "thread_pool_max_workers": kwargs.get("scheduler_thread_pool_max_workers", 10), "consume_interval_seconds": kwargs.get("scheduler_consume_interval_seconds", 3), "enable_parallel_dispatch": kwargs.get("scheduler_enable_parallel_dispatch", True), - "enable_act_memory_update": True, + "enable_activation_memory": True, }, } diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 1daf87b4..44bc7da3 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -53,7 +53,7 @@ def __init__(self, config: BaseSchedulerConfig): # hyper-parameters self.top_k = self.config.get("top_k", 10) self.context_window_size = self.config.get("context_window_size", 5) - self.enable_act_memory_update = self.config.get("enable_act_memory_update", False) + self.enable_activation_memory = self.config.get("enable_activation_memory", False) self.act_mem_dump_path = self.config.get("act_mem_dump_path", DEFAULT_ACT_MEM_DUMP_PATH) self.search_method = TreeTextMemory_SEARCH_METHOD self.enable_parallel_dispatch = self.config.get("enable_parallel_dispatch", False) @@ -63,7 +63,7 @@ def __init__(self, config: BaseSchedulerConfig): self.retriever: SchedulerRetriever | None = None self.monitor: SchedulerGeneralMonitor | None = None - self.thread_pool_monitor: SchedulerDispatcherMonitor | None = None + self.dispatcher_monitor: SchedulerDispatcherMonitor | None = None self.dispatcher = SchedulerDispatcher( max_workers=self.thread_pool_max_workers, enable_parallel_dispatch=self.enable_parallel_dispatch, @@ -100,18 +100,18 @@ def initialize_modules(self, chat_llm: BaseLLM, process_llm: BaseLLM | None = No self.chat_llm = chat_llm self.process_llm = process_llm self.monitor = SchedulerGeneralMonitor(process_llm=self.process_llm, config=self.config) - self.thread_pool_monitor = SchedulerDispatcherMonitor(config=self.config) + self.dispatcher_monitor = SchedulerDispatcherMonitor(config=self.config) self.retriever = SchedulerRetriever(process_llm=self.process_llm, config=self.config) if self.enable_parallel_dispatch: - self.thread_pool_monitor.initialize(dispatcher=self.dispatcher) - self.thread_pool_monitor.start() + self.dispatcher_monitor.initialize(dispatcher=self.dispatcher) + self.dispatcher_monitor.start() # initialize with auth_cofig if self.auth_config_path is not None and Path(self.auth_config_path).exists(): - self.auth_config = AuthConfig.from_local_yaml(config_path=self.auth_config_path) + self.auth_config = AuthConfig.from_local_config(config_path=self.auth_config_path) elif AuthConfig.default_config_exists(): - self.auth_config = AuthConfig.from_local_yaml() + self.auth_config = AuthConfig.from_local_config() else: self.auth_config = None @@ -183,7 +183,7 @@ def transform_working_memories_to_monitors( ) result.append(mem_monitor) - logger.debug(f"Transformed {len(result)} memories to monitors") + logger.info(f"Transformed {len(result)} memories to monitors") return result def replace_working_memory( @@ -213,7 +213,7 @@ def replace_working_memory( # update working memory monitors query_keywords = query_monitor.get_keywords_collections() - logger.debug( + logger.info( f"Processing {len(memories_with_new_order)} memories with {len(query_keywords)} query keywords" ) new_working_memory_monitors = self.transform_working_memories_to_monitors( @@ -225,6 +225,7 @@ def replace_working_memory( for one in new_working_memory_monitors: one.sorting_score = 0 + logger.info(f"update {len(new_working_memory_monitors)} working_memory_monitors") self.monitor.update_working_memory_monitors( new_working_memory_monitors=new_working_memory_monitors, user_id=user_id, @@ -316,6 +317,7 @@ def update_activation_memory( cache_item = act_mem.extract(new_text_memory) cache_item.records.text_memories = new_text_memories + cache_item.records.timestamp = datetime.utcnow() act_mem.add([cache_item]) act_mem.dump(self.act_mem_dump_path) @@ -375,6 +377,11 @@ def update_activation_memory_periodically( logger.info( f"Collected {len(new_activation_memories)} new memory entries for processing" ) + # Print the content of each new activation memory + for i, memory in enumerate(new_activation_memories[:5], 1): + logger.info( + f"Part of New Activation Memorires | {i}/{len(new_activation_memories)}: {memory[:20]}" + ) self.update_activation_memory( new_memories=new_activation_memories, @@ -389,6 +396,7 @@ def update_activation_memory_periodically( logger.debug( f"Activation memory update completed at {self.monitor.last_activation_mem_update_time}" ) + else: logger.info( f"Skipping update - {interval_seconds} second interval not yet reached. " @@ -396,7 +404,7 @@ def update_activation_memory_periodically( f"{datetime.utcnow()}" ) except Exception as e: - logger.error(f"Error: {e}", exc_info=True) + logger.error(f"Error in update_activation_memory_periodically: {e}", exc_info=True) def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageItem]): """Submit multiple messages to the message queue.""" @@ -531,10 +539,15 @@ def stop(self) -> None: logger.info("Consumer thread stopped") # Shutdown dispatcher - if hasattr(self, "dispatcher") and self.dispatcher: + if self.dispatcher: logger.info("Shutting down dispatcher...") self.dispatcher.shutdown() + # Shutdown dispatcher_monitor + if self.dispatcher_monitor: + logger.info("Shutting down monitor...") + self.dispatcher_monitor.stop() + # Clean up queues self._cleanup_queues() logger.info("Memory Scheduler stopped completely") diff --git a/src/memos/mem_scheduler/general_modules/rabbitmq_service.py b/src/memos/mem_scheduler/general_modules/rabbitmq_service.py index c782c309..8865c223 100644 --- a/src/memos/mem_scheduler/general_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/general_modules/rabbitmq_service.py @@ -71,9 +71,9 @@ def initialize_rabbitmq( if config is None: if config_path is None and AuthConfig.default_config_exists(): - auth_config = AuthConfig.from_local_yaml() + auth_config = AuthConfig.from_local_config() elif Path(config_path).exists(): - auth_config = AuthConfig.from_local_yaml(config_path=config_path) + auth_config = AuthConfig.from_local_config(config_path=config_path) else: logger.error("Fail to initialize auth_config") return diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index cb5a122a..0aa66707 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -180,6 +180,11 @@ def log_activation_memory_update( mem_cube_id=mem_cube_id, mem_cube=mem_cube, ) + logger.info( + f"{len(added_memories)} {TEXT_MEMORY_TYPE} memorie(s) " + f"transformed to {ACTIVATION_MEMORY_TYPE} memories." + ) + log_message_b = self.create_autofilled_log_item( log_content=mem, label=label, @@ -189,12 +194,13 @@ def log_activation_memory_update( mem_cube_id=mem_cube_id, mem_cube=mem_cube, ) - log_func_callback([log_message_a, log_message_b]) logger.info( - f"{len(added_memories)} {LONG_TERM_MEMORY_TYPE} memorie(s) " - f"transformed to {WORKING_MEMORY_TYPE} memories." + f"{len(added_memories)} {ACTIVATION_MEMORY_TYPE} memorie(s) " + f"transformed to {PARAMETER_MEMORY_TYPE} memories." ) + log_func_callback([log_message_a, log_message_b]) + @log_exceptions(logger=logger) def log_adding_memory( self, diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index a4768625..08293886 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -15,6 +15,7 @@ ) from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.monitor_schemas import QueryMonitorItem +from memos.mem_scheduler.utils.filter_utils import is_all_chinese, is_all_english from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory @@ -141,6 +142,24 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: query_keywords = self.monitor.extract_query_keywords(query=query) logger.info(f'Extract keywords "{query_keywords}" from query "{query}"') + if len(query_keywords) == 0: + stripped_query = query.strip() + # Determine measurement method based on language + if is_all_english(stripped_query): + words = stripped_query.split() # Word count for English + elif is_all_chinese(stripped_query): + words = stripped_query # Character count for Chinese + else: + logger.debug( + f"Mixed-language memory, using character count: {stripped_query[:50]}..." + ) + words = stripped_query # Default to character count + + query_keywords = list(set(words[:20])) + logger.error( + f"Keyword extraction failed for query. Using fallback keywords: {query_keywords[:10]}... (truncated)" + ) + item = QueryMonitorItem( query_text=query, keywords=query_keywords, @@ -177,6 +196,20 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: ) logger.info(f"size of new_order_working_memory: {len(new_order_working_memory)}") + # update activation memories + logger.info( + f"Activation memory update {'enabled' if self.enable_activation_memory else 'disabled'} " + f"(interval: {self.monitor.act_mem_update_interval}s)" + ) + if self.enable_activation_memory: + self.update_activation_memory_periodically( + interval_seconds=self.monitor.act_mem_update_interval, + label=QUERY_LABEL, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=messages[0].mem_cube, + ) + def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: """ Process and handle answer trigger messages from the queue. @@ -199,16 +232,6 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: # for status update self._set_current_context_from_message(msg=messages[0]) - # update activation memories - if self.enable_act_memory_update: - self.update_activation_memory_periodically( - interval_seconds=self.monitor.act_mem_update_interval, - label=ANSWER_LABEL, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=messages[0].mem_cube, - ) - def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") # Process the query in a session turn @@ -251,15 +274,6 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: log_func_callback=self._submit_web_logs, ) - # update activation memories - if self.enable_act_memory_update: - self.update_activation_memory_periodically( - interval_seconds=self.monitor.act_mem_update_interval, - label=ADD_LABEL, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=messages[0].mem_cube, - ) except Exception as e: logger.error(f"Error: {e}", exc_info=True) diff --git a/src/memos/mem_scheduler/monitors/dispatcher_monitor.py b/src/memos/mem_scheduler/monitors/dispatcher_monitor.py index 6e820980..229e9c3a 100644 --- a/src/memos/mem_scheduler/monitors/dispatcher_monitor.py +++ b/src/memos/mem_scheduler/monitors/dispatcher_monitor.py @@ -21,8 +21,8 @@ def __init__(self, config: BaseSchedulerConfig): super().__init__() self.config: BaseSchedulerConfig = config - self.check_interval = self.config.get("thread_pool_monitor_check_interval", 60) - self.max_failures = self.config.get("thread_pool_monitor_max_failures", 2) + self.check_interval = self.config.get("dispatcher_monitor_check_interval", 60) + self.max_failures = self.config.get("dispatcher_monitor_max_failures", 2) # Registry of monitored thread pools self._pools: dict[str, dict] = {} @@ -133,14 +133,33 @@ def start(self) -> bool: return True def stop(self) -> None: - """Stop the monitoring thread gracefully.""" + """ + Stop the monitoring thread and clean up all managed thread pools. + Ensures proper shutdown of all monitored executors. + """ if not self._running: return + # Stop the monitoring loop self._running = False if self._monitor_thread and self._monitor_thread.is_alive(): self._monitor_thread.join(timeout=5) - logger.info("Thread pool monitor stopped") + + # Shutdown all registered pools + with self._pool_lock: + for name, pool_info in self._pools.items(): + executor = pool_info["executor"] + if not executor._shutdown: # pylint: disable=protected-access + try: + logger.info(f"Shutting down thread pool '{name}'") + executor.shutdown(wait=True, cancel_futures=True) + logger.info(f"Successfully shut down thread pool '{name}'") + except Exception as e: + logger.error(f"Error shutting down pool '{name}': {e!s}", exc_info=True) + + # Clear the pool registry + self._pools.clear() + logger.info("Thread pool monitor and all pools stopped") def _check_pools_health(self) -> None: """Check health of all registered thread pools.""" diff --git a/src/memos/mem_scheduler/monitors/general_monitor.py b/src/memos/mem_scheduler/monitors/general_monitor.py index 0c4174fa..6bc796cc 100644 --- a/src/memos/mem_scheduler/monitors/general_monitor.py +++ b/src/memos/mem_scheduler/monitors/general_monitor.py @@ -42,8 +42,12 @@ def __init__(self, process_llm: BaseLLM, config: BaseSchedulerConfig): # Partial Retention Strategy self.partial_retention_number = 2 - self.working_mem_monitor_capacity = DEFAULT_WORKING_MEM_MONITOR_SIZE_LIMIT - self.activation_mem_monitor_capacity = DEFAULT_ACTIVATION_MEM_MONITOR_SIZE_LIMIT + self.working_mem_monitor_capacity = self.config.get( + "working_mem_monitor_capacity", DEFAULT_WORKING_MEM_MONITOR_SIZE_LIMIT + ) + self.activation_mem_monitor_capacity = self.config.get( + "activation_mem_monitor_capacity", DEFAULT_ACTIVATION_MEM_MONITOR_SIZE_LIMIT + ) # attributes # recording query_messages diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index 070026f0..a81caf5a 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -14,8 +14,8 @@ TextMemory_SEARCH_METHOD = "text_memory_search" DIRECT_EXCHANGE_TYPE = "direct" FANOUT_EXCHANGE_TYPE = "fanout" -DEFAULT_WORKING_MEM_MONITOR_SIZE_LIMIT = 20 -DEFAULT_ACTIVATION_MEM_MONITOR_SIZE_LIMIT = 5 +DEFAULT_WORKING_MEM_MONITOR_SIZE_LIMIT = 30 +DEFAULT_ACTIVATION_MEM_MONITOR_SIZE_LIMIT = 20 DEFAULT_ACT_MEM_DUMP_PATH = f"{BASE_DIR}/outputs/mem_scheduler/mem_cube_scheduler_test.kv_cache" DEFAULT_THREAD__POOL_MAX_WORKERS = 5 DEFAULT_CONSUME_INTERVAL_SECONDS = 3 diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index 0a540574..9b5bd5d8 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -138,6 +138,7 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin): def debug_info(self) -> dict[str, Any]: """Return structured debug information for logging purposes.""" return { + "content_preview:": self.log_content[:50], "log_id": self.item_id, "user_id": self.user_id, "mem_cube_id": self.mem_cube_id, diff --git a/src/memos/mem_scheduler/utils/misc_utils.py b/src/memos/mem_scheduler/utils/misc_utils.py index 4a9c0246..aa9b5c48 100644 --- a/src/memos/mem_scheduler/utils/misc_utils.py +++ b/src/memos/mem_scheduler/utils/misc_utils.py @@ -1,4 +1,5 @@ import json +import re from functools import wraps from pathlib import Path @@ -12,12 +13,52 @@ def extract_json_dict(text: str): + """ + Safely extracts JSON from LLM response text with robust error handling. + + Args: + text: Raw text response from LLM that may contain JSON + + Returns: + Parsed JSON data (dict or list) + + Raises: + ValueError: If no valid JSON can be extracted + """ + if not text: + raise ValueError("Empty input text") + + # Normalize the text text = text.strip() + + # Remove common code block markers patterns_to_remove = ["json```", "```python", "```json", "latex```", "```latex", "```"] for pattern in patterns_to_remove: text = text.replace(pattern, "") - res = json.loads(text.strip()) - return res + + # Try: direct JSON parse first + try: + return json.loads(text.strip()) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON from text: {text}. Error: {e!s}", exc_info=True) + + # Fallback 1: Extract JSON using regex + json_pattern = r"\{[\s\S]*\}|\[[\s\S]*\]" + matches = re.findall(json_pattern, text) + if matches: + try: + return json.loads(matches[0]) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON from text: {text}. Error: {e!s}", exc_info=True) + + # Fallback 2: Handle malformed JSON (common LLM issues) + try: + # Try adding missing quotes around keys + text = re.sub(r"([\{\s,])(\w+)(:)", r'\1"\2"\3', text) + return json.loads(text) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON from text: {text}. Error: {e!s}", exc_info=True) + raise ValueError(text) from e def parse_yaml(yaml_file: str | Path): diff --git a/src/memos/memories/activation/item.py b/src/memos/memories/activation/item.py index 8babfeef..ba161937 100644 --- a/src/memos/memories/activation/item.py +++ b/src/memos/memories/activation/item.py @@ -23,7 +23,7 @@ class KVCacheRecords(BaseModel): description="Single string combining all text_memories using assembly template", ) timestamp: datetime = Field( - default_factory=datetime.now, description="submit time for schedule_messages" + default_factory=datetime.utcnow, description="submit time for schedule_messages" ) diff --git a/src/memos/templates/mem_scheduler_prompts.py b/src/memos/templates/mem_scheduler_prompts.py index a15021d9..a1fa5324 100644 --- a/src/memos/templates/mem_scheduler_prompts.py +++ b/src/memos/templates/mem_scheduler_prompts.py @@ -86,10 +86,18 @@ - Queries: Recent user questions/requests (list) - Current Order: Existing memory sequence (list of strings with indices) -## Output Requirements -Return a JSON object with: -- "new_order": The reordered indices (array of integers) -- "reasoning": Brief explanation of your ranking logic (1-2 sentences) +## Output Format Requirements +You MUST output a valid JSON object with EXACTLY the following structure: +{{ + "new_order": [array_of_integers], + "reasoning": "string_explanation" +}} + +## Important Notes: +- Only output the JSON object, nothing else +- Do not include any markdown formatting or code block notation +- Ensure all brackets and quotes are properly closed +- The output must be parseable by a JSON parser ## Processing Guidelines 1. Prioritize evidence that: From b959bb347bcfe33ef49302d3b1bdee5144a8b384 Mon Sep 17 00:00:00 2001 From: chentang Date: Wed, 6 Aug 2025 19:44:24 +0800 Subject: [PATCH 5/5] modify datetime to utc date time --- src/memos/mem_os/product.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/mem_os/product.py b/src/memos/mem_os/product.py index 8224f08b..214866e5 100644 --- a/src/memos/mem_os/product.py +++ b/src/memos/mem_os/product.py @@ -491,7 +491,7 @@ def _send_message_to_scheduler( mem_cube=self.mem_cubes[mem_cube_id], label=label, content=query, - timestamp=datetime.now().isoformat(), + timestamp=datetime.utcnow(), ) self.mem_scheduler.submit_messages(messages=[message_item])