Skip to content
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import re

from datetime import datetime

from dateutil import parser
Expand All @@ -11,82 +10,68 @@
from memos.log import get_logger
from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata
from memos.templates.tree_reorganize_prompts import (
CONFLICT_DETECTOR_PROMPT,
CONFLICT_RESOLVER_PROMPT,
MEMORY_RELATION_DETECTOR_PROMPT,
MEMORY_RELATION_RESOLVER_PROMPT,
)


logger = get_logger(__name__)


class ConflictHandler:
class NodeHandler:
EMBEDDING_THRESHOLD: float = 0.8 # Threshold for embedding similarity to consider conflict

def __init__(self, graph_store: Neo4jGraphDB, llm: BaseLLM, embedder: BaseEmbedder):
self.graph_store = graph_store
self.llm = llm
self.embedder = embedder

def detect(
self, memory: TextualMemoryItem, top_k: int = 5, scope: str | None = None
) -> list[tuple[TextualMemoryItem, TextualMemoryItem]]:
"""
Detect conflicts by finding the most similar items in the graph database based on embedding, then use LLM to judge conflict.
Args:
memory: The memory item (should have an embedding attribute or field).
top_k: Number of top similar nodes to retrieve.
scope: Optional memory type filter.
Returns:
List of conflict pairs (each pair is a tuple: (memory, candidate)).
"""
def detect(self, memory, top_k: int = 5, scope=None):
# 1. Search for similar memories based on embedding
embedding = memory.metadata.embedding
embedding_candidates_info = self.graph_store.search_by_embedding(
embedding, top_k=top_k, scope=scope
embedding, top_k=top_k, scope=scope, threshold=self.EMBEDDING_THRESHOLD
)
# 2. Filter based on similarity threshold
embedding_candidates_ids = [
info["id"]
for info in embedding_candidates_info
if info["score"] >= self.EMBEDDING_THRESHOLD and info["id"] != memory.id
info["id"] for info in embedding_candidates_info if info["id"] != memory.id
]
# 3. Judge conflicts using LLM
embedding_candidates = self.graph_store.get_nodes(embedding_candidates_ids)
conflict_pairs = []
detected_relationships = []
for embedding_candidate in embedding_candidates:
embedding_candidate = TextualMemoryItem.from_dict(embedding_candidate)
prompt = [
{
"role": "system",
"content": "You are a conflict detector for memory items.",
},
{
"role": "user",
"content": CONFLICT_DETECTOR_PROMPT.format(
statement_1=memory.memory,
statement_2=embedding_candidate.memory,
"content": MEMORY_RELATION_DETECTOR_PROMPT.format(
statement_1=memory.memory, statement_2=embedding_candidate.memory
),
},
}
]
result = self.llm.generate(prompt).strip()
if "yes" in result.lower():
conflict_pairs.append([memory, embedding_candidate])
if len(conflict_pairs):
conflict_text = "\n".join(
f'"{pair[0].memory!s}" <==CONFLICT==> "{pair[1].memory!s}"'
for pair in conflict_pairs
)
logger.warning(
f"Detected {len(conflict_pairs)} conflicts for memory {memory.id}\n {conflict_text}"
)
return conflict_pairs
if result == "contradictory":
logger.warning(
f'detected "{memory.memory}" <==CONFLICT==> "{embedding_candidate.memory}"'
)
detected_relationships.append([memory, embedding_candidate, "contradictory"])
elif result == "redundant":
logger.warning(
f'detected "{memory.memory}" <==REDUNDANT==> "{embedding_candidate.memory}"'
)
detected_relationships.append([memory, embedding_candidate, "redundant"])
elif result == "independent":
pass
else:
pass
return detected_relationships

def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem) -> None:
def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem, relation) -> None:
"""
Resolve detected conflicts between two memory items using LLM fusion.
Args:
memory_a: The first conflicting memory item.
memory_b: The second conflicting memory item.
relation: relation
Returns:
A fused TextualMemoryItem representing the resolved memory.
"""
Expand All @@ -96,13 +81,10 @@ def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem) -> N
metadata_1 = memory_a.metadata.model_dump_json(include=metadata_for_resolve)
metadata_2 = memory_b.metadata.model_dump_json(include=metadata_for_resolve)
prompt = [
{
"role": "system",
"content": "",
},
{
"role": "user",
"content": CONFLICT_RESOLVER_PROMPT.format(
"content": MEMORY_RELATION_RESOLVER_PROMPT.format(
relation=relation,
statement_1=memory_a.memory,
metadata_1=metadata_1,
statement_2=memory_b.memory,
Expand All @@ -119,7 +101,7 @@ def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem) -> N
# —————— 2.1 Can't resolve conflict, hard update by comparing timestamp ————
if len(answer) <= 10 and "no" in answer.lower():
logger.warning(
f"Conflict between {memory_a.id} and {memory_b.id} could not be resolved. "
f"{relation} between {memory_a.id} and {memory_b.id} could not be resolved. "
)
self._hard_update(memory_a, memory_b)
# —————— 2.2 Conflict resolved, update metadata and memory ————
Expand Down
100 changes: 6 additions & 94 deletions src/memos/memories/textual/tree_text_memory/organize/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,106 +158,18 @@ def _add_to_graph_memory(self, memory: TextualMemoryItem, memory_type: str):
- topic_summary_prefix: summary node id prefix if applicable
- enable_summary_link: whether to auto-link to a summary node
"""
embedding = memory.metadata.embedding

# Step 1: Find similar nodes for possible merging
similar_nodes = self.graph_store.search_by_embedding(
vector=embedding,
top_k=3,
scope=memory_type,
threshold=self._threshold,
status="activated",
)

if similar_nodes and similar_nodes[0]["score"] > self._merged_threshold:
return self._merge(memory, similar_nodes)
else:
node_id = str(uuid.uuid4())
# Step 2: Add new node to graph
self.graph_store.add_node(
node_id, memory.memory, memory.metadata.model_dump(exclude_none=True)
)
self.reorganizer.add_message(
QueueMessage(
op="add",
after_node=[node_id],
)
)
return node_id

def _merge(self, source_node: TextualMemoryItem, similar_nodes: list[dict]) -> str:
"""
TODO: Add node traceability support by optionally preserving source nodes and linking them with MERGED_FROM edges.

Merge the source memory into the most similar existing node (only one),
and establish a MERGED_FROM edge in the graph.

Parameters:
source_node: The new memory item (not yet in the graph)
similar_nodes: A list of dicts returned by search_by_embedding(), ordered by similarity
"""
original_node = similar_nodes[0]
original_id = original_node["id"]
original_data = self.graph_store.get_node(original_id)

target_text = original_data.get("memory", "")
merged_text = f"{target_text}\n⟵MERGED⟶\n{source_node.memory}"

original_meta = TreeNodeTextualMemoryMetadata(**original_data["metadata"])
source_meta = source_node.metadata

merged_key = source_meta.key or original_meta.key
merged_tags = list(set((original_meta.tags or []) + (source_meta.tags or [])))
merged_sources = list(set((original_meta.sources or []) + (source_meta.sources or [])))
merged_background = f"{original_meta.background}\n⟵MERGED⟶\n{source_meta.background}"
merged_embedding = self.embedder.embed([merged_text])[0]

original_conf = original_meta.confidence or 0.0
source_conf = source_meta.confidence or 0.0
merged_confidence = float((original_conf + source_conf) / 2)
merged_usage = list(set((original_meta.usage or []) + (source_meta.usage or [])))

# Create new merged node
merged_id = str(uuid.uuid4())
merged_metadata = source_meta.model_copy(
update={
"embedding": merged_embedding,
"updated_at": datetime.now().isoformat(),
"key": merged_key,
"tags": merged_tags,
"sources": merged_sources,
"background": merged_background,
"confidence": merged_confidence,
"usage": merged_usage,
}
)

node_id = str(uuid.uuid4())
# Step 2: Add new node to graph
self.graph_store.add_node(
merged_id, merged_text, merged_metadata.model_dump(exclude_none=True)
node_id, memory.memory, memory.metadata.model_dump(exclude_none=True)
)

# Add traceability edges: both original and new point to merged node
self.graph_store.add_edge(original_id, merged_id, type="MERGED_TO")
self.graph_store.update_node(original_id, {"status": "archived"})
source_id = str(uuid.uuid4())
source_metadata = source_node.metadata.model_copy(update={"status": "archived"})
self.graph_store.add_node(source_id, source_node.memory, source_metadata.model_dump())
self.graph_store.add_edge(source_id, merged_id, type="MERGED_TO")
# After creating merged node and tracing lineage
self._inherit_edges(original_id, merged_id)

# log to reorganizer before updating the graph
self.reorganizer.add_message(
QueueMessage(
op="merge",
before_node=[
original_id,
source_node.id,
],
after_node=[merged_id],
op="add",
after_node=[node_id],
)
)
return merged_id
return node_id

def _inherit_edges(self, from_id: str, to_id: str) -> None:
"""
Expand Down
Loading