Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
backend: general_scheduler
config:
top_k: 10
top_n: 5
top_n: 10
act_mem_update_interval: 30
context_window_size: 5
context_window_size: 10
thread_pool_max_workers: 5
consume_interval_seconds: 3
consume_interval_seconds: 1
enable_parallel_dispatch: true
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ mem_scheduler:
backend: "general_scheduler"
config:
top_k: 10
top_n: 5
top_n: 10
act_mem_update_interval: 30
context_window_size: 5
context_window_size: 10
thread_pool_max_workers: 10
consume_interval_seconds: 3
consume_interval_seconds: 1
enable_parallel_dispatch: true
max_turns_window: 20
top_k: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ mem_reader:
mem_scheduler:
backend: "general_scheduler"
config:
top_k: 2
top_n: 5
top_k: 10
top_n: 10
act_mem_update_interval: 30
context_window_size: 5
context_window_size: 10
thread_pool_max_workers: 10
consume_interval_seconds: 3
consume_interval_seconds: 1
enable_parallel_dispatch: true
max_turns_window: 20
top_k: 5
Expand Down
2 changes: 1 addition & 1 deletion examples/mem_os/chat_w_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from memos.configs.mem_os import MOSConfig
from memos.mem_cube.general import GeneralMemCube
from memos.mem_os.main import MOS
from memos.mem_scheduler.utils import parse_yaml
from memos.mem_scheduler.utils.misc_utils import parse_yaml


# init MOS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,29 @@

from datetime import datetime
from pathlib import Path
from queue import Queue
from typing import TYPE_CHECKING

from memos.configs.mem_cube import GeneralMemCubeConfig
from memos.configs.mem_os import MOSConfig
from memos.configs.mem_scheduler import AuthConfig, SchedulerConfigFactory
from memos.log import get_logger
from memos.mem_cube.general import GeneralMemCube
from memos.mem_os.main import MOS
from memos.mem_scheduler.modules.schemas import (
from memos.mem_scheduler.general_scheduler import GeneralScheduler
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
from memos.mem_scheduler.schemas.general_schemas import (
ANSWER_LABEL,
QUERY_LABEL,
ScheduleMessageItem,
)
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
from memos.mem_scheduler.utils import parse_yaml
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
from memos.mem_scheduler.utils.misc_utils import parse_yaml


if TYPE_CHECKING:
from memos.mem_scheduler.schemas import (
ScheduleLogForWebItem,
)


FILE_PATH = Path(__file__).absolute()
Expand Down Expand Up @@ -109,6 +118,8 @@ def run_with_automatic_scheduler_init():
response = mos.chat(query, user_id=user_id)
print(f"Query:\n {query}\n\nAnswer:\n {response}")

show_web_logs(mem_scheduler=mos.mem_scheduler)

mos.mem_scheduler.stop()


Expand Down Expand Up @@ -184,9 +195,46 @@ def run_with_manual_scheduler_init():
mos.mem_scheduler.submit_messages(messages=message_item)
print(f"Query:\n {query}\n\nAnswer:\n {response}")

show_web_logs(mem_scheduler=mos.mem_scheduler)

mos.mem_scheduler.stop()


def show_web_logs(mem_scheduler: GeneralScheduler):
"""Display all web log entries from the scheduler's log queue.

Args:
mem_scheduler: The scheduler instance containing web logs to display
"""
if mem_scheduler._web_log_message_queue.empty():
print("Web log queue is currently empty.")
return

print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)

# Create a temporary queue to preserve the original queue contents
temp_queue = Queue()
log_count = 0

while not mem_scheduler._web_log_message_queue.empty():
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
temp_queue.put(log_item)
log_count += 1

# Print log entry details
print(f"\nLog Entry #{log_count}:")
print(f'- "{log_item.label}" log: {log_item}')

print("-" * 50)

# Restore items back to the original queue
while not temp_queue.empty():
mem_scheduler._web_log_message_queue.put(temp_queue.get())

print(f"\nTotal {log_count} web log entries displayed.")
print("=" * 110 + "\n")


if __name__ == "__main__":
run_with_automatic_scheduler_init()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
import json
import shutil
import sys

from pathlib import Path
from queue import Queue
from typing import TYPE_CHECKING

from memos.configs.mem_cube import GeneralMemCubeConfig
from memos.configs.mem_os import MOSConfig
from memos.configs.mem_scheduler import AuthConfig
from memos.log import get_logger
from memos.mem_cube.general import GeneralMemCube
from memos.mem_scheduler.general_scheduler import GeneralScheduler
from memos.mem_scheduler.mos_for_test_scheduler import MOSForTestScheduler


if TYPE_CHECKING:
from memos.mem_scheduler.modules.schemas import (
ScheduleLogForWebItem,
)


FILE_PATH = Path(__file__).absolute()
BASE_DIR = FILE_PATH.parent.parent.parent
sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory
Expand Down Expand Up @@ -90,41 +82,6 @@ def init_task():
return conversations, questions


def show_web_logs(mem_scheduler: GeneralScheduler):
"""Display all web log entries from the scheduler's log queue.

Args:
mem_scheduler: The scheduler instance containing web logs to display
"""
if mem_scheduler._web_log_message_queue.empty():
print("Web log queue is currently empty.")
return

print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)

# Create a temporary queue to preserve the original queue contents
temp_queue = Queue()
log_count = 0

while not mem_scheduler._web_log_message_queue.empty():
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
temp_queue.put(log_item)
log_count += 1

# Print log entry details
print(f"\nLog Entry #{log_count}:")
print(f'- "{log_item.label}" log: {log_item}')

print("-" * 50)

# Restore items back to the original queue
while not temp_queue.empty():
mem_scheduler._web_log_message_queue.put(temp_queue.get())

print(f"\nTotal {log_count} web log entries displayed.")
print("=" * 110 + "\n")


if __name__ == "__main__":
# set up data
conversations, questions = init_task()
Expand Down Expand Up @@ -168,12 +125,18 @@ def show_web_logs(mem_scheduler: GeneralScheduler):

mos.add(conversations, user_id=user_id, mem_cube_id=mem_cube_id)

# Add interfering conversations
file_path = Path(f"{BASE_DIR}/examples/data/mem_scheduler/scene_data.json")
scene_data = json.load(file_path.open("r", encoding="utf-8"))
mos.add(scene_data[0], user_id=user_id, mem_cube_id=mem_cube_id)
mos.add(scene_data[1], user_id=user_id, mem_cube_id=mem_cube_id)

for item in questions:
print("===== Chat Start =====")
query = item["question"]

print(f"Query:\n {query}\n")
response = mos.chat(query=query, user_id=user_id)
print(f"Query:\n {query}\n\nAnswer:\n {response}")

show_web_logs(mos.mem_scheduler)
print(f"Answer:\n {response}")
print("===== Chat End =====")

mos.mem_scheduler.stop()
10 changes: 6 additions & 4 deletions examples/mem_scheduler/rabbitmq_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
def publish_message(rabbitmq_module, message):
"""Function to publish a message."""
rabbitmq_module.rabbitmq_publish_message(message)
print(f"Published message: {message}")
print(f"Published message: {message}\n")


def main():
Expand All @@ -24,8 +24,7 @@ def main():
rabbitmq_module.initialize_rabbitmq(config=AuthConfig.from_local_yaml().rabbitmq)

try:
# Start consumer
rabbitmq_module.rabbitmq_start_consuming()
rabbitmq_module.wait_for_connection_ready()

# === Publish some test messages ===
# List to hold thread references
Expand All @@ -38,6 +37,9 @@ def main():
thread.start()
threads.append(thread)

# Start consumer
rabbitmq_module.rabbitmq_start_consuming()

# Join threads to ensure all messages are published before proceeding
for thread in threads:
thread.join()
Expand All @@ -47,7 +49,7 @@ def main():

finally:
# Give some time for cleanup
time.sleep(5)
time.sleep(3)

# Close connections
rabbitmq_module.rabbitmq_close()
Expand Down
3 changes: 2 additions & 1 deletion examples/mem_scheduler/redis_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

from memos.configs.mem_scheduler import SchedulerConfigFactory
from memos.mem_cube.general import GeneralMemCube
from memos.mem_scheduler.modules.schemas import QUERY_LABEL, ScheduleMessageItem
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
from memos.mem_scheduler.schemas.general_schemas import QUERY_LABEL
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem


if TYPE_CHECKING:
Expand Down
10 changes: 6 additions & 4 deletions examples/mem_scheduler/try_schedule_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
from memos.log import get_logger
from memos.mem_cube.general import GeneralMemCube
from memos.mem_scheduler.general_scheduler import GeneralScheduler
from memos.mem_scheduler.modules.schemas import NOT_APPLICABLE_TYPE
from memos.mem_scheduler.mos_for_test_scheduler import MOSForTestScheduler
from memos.mem_scheduler.schemas.general_schemas import (
NOT_APPLICABLE_TYPE,
)


if TYPE_CHECKING:
from memos.mem_scheduler.modules.schemas import (
from memos.mem_scheduler.schemas import (
ScheduleLogForWebItem,
)

Expand Down Expand Up @@ -175,14 +177,14 @@ def show_web_logs(mem_scheduler: GeneralScheduler):
query = item["question"]

# test process_session_turn
mos.mem_scheduler.process_session_turn(
working_memory, new_candidates = mos.mem_scheduler.process_session_turn(
queries=[query],
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
top_k=10,
query_history=None,
)
print(f"\nnew_candidates: {[one.memory for one in new_candidates]}")

# test activation memory update
mos.mem_scheduler.update_activation_memory_periodically(
Expand Down
22 changes: 18 additions & 4 deletions src/memos/configs/mem_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from pydantic import ConfigDict, Field, field_validator, model_validator

from memos.configs.base import BaseConfig
from memos.mem_scheduler.modules.schemas import (
from memos.mem_scheduler.modules.misc import DictConversionMixin
from memos.mem_scheduler.schemas.general_schemas import (
BASE_DIR,
DEFAULT_ACT_MEM_DUMP_PATH,
DEFAULT_CONSUME_INTERVAL_SECONDS,
DEFAULT_THREAD__POOL_MAX_WORKERS,
DictConversionMixin,
)


Expand All @@ -21,6 +21,7 @@ class BaseSchedulerConfig(BaseConfig):
top_k: int = Field(
default=10, description="Number of top candidates to consider in initial retrieval"
)
# TODO: The 'top_n' field is deprecated and will be removed in future versions.
top_n: int = Field(default=5, description="Number of final results to return after processing")
enable_parallel_dispatch: bool = Field(
default=True, description="Whether to enable parallel message processing using thread pool"
Expand Down Expand Up @@ -48,7 +49,7 @@ class GeneralSchedulerConfig(BaseSchedulerConfig):
default=300, description="Interval in seconds for updating activation memory"
)
context_window_size: int | None = Field(
default=5, description="Size of the context window for conversation history"
default=10, description="Size of the context window for conversation history"
)
act_mem_dump_path: str | None = Field(
default=DEFAULT_ACT_MEM_DUMP_PATH, # Replace with DEFAULT_ACT_MEM_DUMP_PATH
Expand Down Expand Up @@ -105,7 +106,20 @@ class RabbitMQConfig(


class GraphDBAuthConfig(BaseConfig):
uri: str = Field(default="localhost", description="URI for graph database access")
uri: str = Field(
default="bolt://localhost:7687",
description="URI for graph database access (e.g., bolt://host:port)",
)
user: str = Field(default="neo4j", description="Username for graph database authentication")
password: str = Field(
default="",
description="Password for graph database authentication",
min_length=8, # 建议密码最小长度
)
db_name: str = Field(default="neo4j", description="Database name to connect to")
auto_create: bool = Field(
default=True, description="Whether to automatically create the database if it doesn't exist"
)


class OpenAIConfig(BaseConfig):
Expand Down
Loading