-
Notifications
You must be signed in to change notification settings - Fork 117
feat: add VectorStoreWriter tool #451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
039b327
feat: add VectorStoreWriter tool
tyaroshko e1086b4
fix: formatting
tyaroshko dac6373
chore: merge changes from main branch
tyaroshko 7469f47
chore: merge changes from main branch
tyaroshko fd96935
refactor: update tool description for VectorStoreWriter
tyaroshko 079c539
feat: add PreprocessTool
tyaroshko 6ccc4de
feat: add vector store write pipeline example
tyaroshko 29ca7ae
chore: merge changes from main branch
tyaroshko d414a71
chore: merge changes from main branch
tyaroshko a5b38a2
chore: merge changes from main branch
tyaroshko 5ee97c6
refactor: remove content key from Writer
tyaroshko e27ef12
chore: merge changes from main branch
tyaroshko File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| from typing import Any, ClassVar, Literal | ||
|
|
||
| from pydantic import BaseModel, Field | ||
|
|
||
| from dynamiq.components.splitters.document import DocumentSplitBy | ||
| from dynamiq.components.splitters.document import DocumentSplitter as DocumentSplitterComponent | ||
| from dynamiq.nodes.node import Node, NodeGroup, ensure_config | ||
| from dynamiq.runnables import RunnableConfig | ||
| from dynamiq.types import Document | ||
| from dynamiq.utils.logger import logger | ||
|
|
||
| PREPROCESS_TOOL_DESCRIPTION = """Preprocesses text by splitting it into smaller parts. | ||
|
|
||
| Key Capabilities: | ||
| - Splitting text into smaller parts based on configurable parameters | ||
| - Dynamic parameter selection that allows to choose the optimal splitting strategy | ||
|
|
||
| Parameter Guide: | ||
| - documents: List of documents to split. | ||
| - split_by: The unit by which the document should be split. \ | ||
| Possible values are "word", "sentence", "page", "passage", "title", "character". Defaults to "sentence". | ||
| - split_length: The maximum number of units to include in each split. Defaults to 10. | ||
| - split_overlap: The number of units that should overlap between consecutive splits. Defaults to 0. | ||
|
|
||
| Examples: | ||
| - {"documents": [{"content": "<content of the document>",\ | ||
| "metadata": {"<field name>": "<field value>"}}]} | ||
| - {"documents": [{"content": "<content of the first document>",\ | ||
| "metadata": {"<field name>": "<field value>"}},{"content": "<content of the second document>",\ | ||
| "metadata": {"<field name>": "<field value>"}}]} | ||
| """ | ||
|
|
||
|
|
||
| class PreprocessToolInputSchema(BaseModel): | ||
| documents: list[Document] = Field(..., description="Parameter to provide documents to split.") | ||
| split_by: DocumentSplitBy = Field( | ||
| default=DocumentSplitBy.SENTENCE, | ||
| description="Parameter to provide the unit by which the document should be split.", | ||
| ) | ||
| split_length: int = Field( | ||
| default=10, description="Parameter to provide the maximum number of units to include in each split." | ||
| ) | ||
| split_overlap: int = Field( | ||
| default=0, | ||
| description="Parameter to provide the number of units that should overlap between consecutive splits.", | ||
| ) | ||
|
|
||
|
|
||
| class PreprocessTool(Node): | ||
| """ | ||
| A tool for preprocessing text by splitting it into smaller parts. | ||
| """ | ||
|
|
||
| group: Literal[NodeGroup.SPLITTERS] = NodeGroup.SPLITTERS | ||
| name: str = "PreprocessTool" | ||
| description: str = PREPROCESS_TOOL_DESCRIPTION | ||
| split_by: DocumentSplitBy = DocumentSplitBy.SENTENCE | ||
| split_length: int = 10 | ||
| split_overlap: int = 0 | ||
| input_schema: ClassVar[type[PreprocessToolInputSchema]] = PreprocessToolInputSchema | ||
|
|
||
| def execute(self, input_data: PreprocessToolInputSchema, config: RunnableConfig = None, **kwargs) -> dict[str, Any]: | ||
| """Splits the documents into chunks based on the provided parameters. | ||
|
|
||
| Args: | ||
| input_data (PreprocessToolInputSchema): The input data containing the documents to split. | ||
| config (RunnableConfig, optional): The configuration for the execution. Defaults to None. | ||
| **kwargs: Additional keyword arguments. | ||
|
|
||
| Returns: | ||
| dict[str, Any]: A dictionary containing the split documents under the key "documents". | ||
| """ | ||
| config = ensure_config(config) | ||
| self.run_on_node_execute_run(config.callbacks, **kwargs) | ||
|
|
||
| documents = input_data.documents | ||
|
|
||
| split_by = input_data.split_by or self.split_by | ||
| split_length = input_data.split_length if input_data.split_length is not None else self.split_length | ||
| split_overlap = input_data.split_overlap if input_data.split_overlap is not None else self.split_overlap | ||
|
|
||
| logger.debug( | ||
| f"Splitting {len(documents)} documents with parameters: split_by={split_by}, " | ||
| f"split_length={split_length}, split_overlap={split_overlap}" | ||
| ) | ||
|
|
||
| splitter = DocumentSplitterComponent( | ||
| split_by=split_by, | ||
| split_length=split_length, | ||
| split_overlap=split_overlap, | ||
| ) | ||
|
|
||
| output = splitter.run(documents=documents) | ||
|
|
||
| split_documents = output["documents"] | ||
| logger.debug(f"Split {len(documents)} documents into {len(split_documents)} parts") | ||
|
|
||
| return { | ||
| "content": split_documents, | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,225 @@ | ||
| from typing import Any, ClassVar, Literal | ||
|
|
||
| from pydantic import BaseModel, Field, model_validator | ||
|
|
||
| from dynamiq.connections.managers import ConnectionManager | ||
| from dynamiq.nodes import ErrorHandling, Node | ||
| from dynamiq.nodes.agents.exceptions import ToolExecutionException | ||
| from dynamiq.nodes.embedders.base import DocumentEmbedder | ||
| from dynamiq.nodes.node import NodeDependency, NodeGroup, ensure_config | ||
| from dynamiq.nodes.writers.base import Writer | ||
| from dynamiq.runnables import RunnableConfig | ||
| from dynamiq.types import Document | ||
| from dynamiq.utils.logger import logger | ||
|
|
||
| DESCRIPTION_VECTOR_STORE_WRITER = """Writes documents (or text) to a vector store. | ||
|
|
||
| Key Capabilities: | ||
| - Adding textual content to the vector store as separate database entries | ||
| - Adding metadata to the vector store entries | ||
|
|
||
| Parameter Guide: | ||
| - documents: List of strings to write to the vector store along with their metadata. | ||
|
|
||
| Guildelines: | ||
| - The vector story entry metadata may consist of the following fields (but is not limited to): | ||
| - url | ||
| - title | ||
| - description | ||
| - author | ||
| - published_date | ||
| - source | ||
| - category | ||
| - tags | ||
| - etc. | ||
| - If any metadata field is provided by the user, it should be included in the vector store entry metadata. | ||
| - The input documents should be a list of dictionaries with the following structure: | ||
| - { "documents": [{"content": "<content of the vector store entry>","metadata": {"<field name>": "<field value>"}}]} | ||
|
|
||
| Examples: | ||
| - { | ||
| "documents": [ | ||
| { | ||
| "content": "Artificial intelligence is transforming healthcare by improving diagnostics and patient care.", | ||
| "metadata": { | ||
| "title": "AI in Healthcare", | ||
| "author": "Jane Doe", | ||
| "published_date": "2025-09-10", | ||
| "source": "Nature Medicine", | ||
| "url": "https://www.nature.com/articles/ai-healthcare", | ||
| "category": "Healthcare", | ||
| "tags": ["AI", "medicine", "technology"] | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| - { | ||
| "documents": [ | ||
| { | ||
| "content": "OpenAI has announced a new framework for autonomous agents capable of reasoning and planning.", | ||
| "metadata": { | ||
| "title": "Next-Gen AI Agents", | ||
| "author": "OpenAI Research Team", | ||
| "published_date": "2025-07-01", | ||
| "category": "Artificial Intelligence", | ||
| "tags": ["AI", "agents", "research"] | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| """ | ||
|
|
||
|
|
||
| class VectorStoreWriterInputSchema(BaseModel): | ||
| documents: list[Document] | list[dict] = Field( | ||
| ..., | ||
| description="Parameter to provide documents to write to the vector store.", | ||
| ) | ||
|
|
||
| @model_validator(mode="after") | ||
| def validate_input_documents(self): | ||
| """ | ||
| Validate the input documents by converting list of dictionaries | ||
| to Documents (when using inside an agent) and ensuring metadata is never None. | ||
| """ | ||
| if self.documents: | ||
| if isinstance(self.documents[0], dict): | ||
| converted_docs = [] | ||
| for doc_dict in self.documents: | ||
| if not doc_dict.get("content", ""): | ||
| raise ValueError("Document dict must contain 'content' field") | ||
| if not doc_dict.get("metadata", {}): | ||
| doc_dict["metadata"] = {} | ||
| converted_docs.append(Document(**doc_dict)) | ||
| self.documents = converted_docs | ||
| elif isinstance(self.documents[0], Document): | ||
| for doc in self.documents: | ||
| doc.metadata = doc.metadata or {} | ||
| return self | ||
|
|
||
|
|
||
| class VectorStoreWriter(Node): | ||
acoola marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Node for writing documents to a vector store. | ||
|
|
||
| Attributes: | ||
| group (Literal[NodeGroup.TOOLS]): Group for the node. Defaults to NodeGroup.TOOLS. | ||
| name (str): Name of the tool. Defaults to "VectorStore Writer". | ||
| description (str): Description of the tool. | ||
| error_handling (ErrorHandling): Error handling configuration. | ||
| document_embedder (DocumentEmbedder): Document embedder node. | ||
| document_writer (Writer): Document writer node. | ||
| """ | ||
|
|
||
| group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS | ||
| name: str = "VectorStore Writer" | ||
| description: str = DESCRIPTION_VECTOR_STORE_WRITER | ||
| error_handling: ErrorHandling = Field(default_factory=lambda: ErrorHandling(timeout_seconds=600)) | ||
| document_embedder: DocumentEmbedder | ||
| document_writer: Writer | ||
|
|
||
| input_schema: ClassVar[type[VectorStoreWriterInputSchema]] = VectorStoreWriterInputSchema | ||
|
|
||
| def __init__(self, **kwargs): | ||
| """ | ||
| Initializes the VectorStoreWriter with the given parameters. | ||
|
|
||
| Args: | ||
| **kwargs: Additional keyword arguments to be passed to the parent class constructor. | ||
| """ | ||
| super().__init__(**kwargs) | ||
| self._run_depends = [] | ||
|
|
||
| def reset_run_state(self): | ||
| """ | ||
| Reset the intermediate steps (run_depends) of the node. | ||
| """ | ||
| self._run_depends = [] | ||
|
|
||
| def init_components(self, connection_manager: ConnectionManager | None = None) -> None: | ||
| """ | ||
| Initialize the components of the tool. | ||
|
|
||
| Args: | ||
| connection_manager (ConnectionManager, optional): connection manager. Defaults to ConnectionManager. | ||
| """ | ||
| connection_manager = connection_manager or ConnectionManager() | ||
| super().init_components(connection_manager) | ||
| if self.document_embedder.is_postponed_component_init: | ||
| self.document_embedder.init_components(connection_manager) | ||
| if self.document_writer.is_postponed_component_init: | ||
| self.document_writer.init_components(connection_manager) | ||
|
|
||
| @property | ||
| def to_dict_exclude_params(self): | ||
| """ | ||
| Property to define which parameters should be excluded when converting the class instance to a dictionary. | ||
|
|
||
| Returns: | ||
| dict: A dictionary defining the parameters to exclude. | ||
| """ | ||
| return super().to_dict_exclude_params | {"document_embedder": True, "document_writer": True} | ||
|
|
||
| def to_dict(self, **kwargs) -> dict: | ||
| """Converts the instance to a dictionary. | ||
|
|
||
| Returns: | ||
| dict: A dictionary representation of the instance. | ||
| """ | ||
| data = super().to_dict(**kwargs) | ||
| data["document_embedder"] = self.document_embedder.to_dict(**kwargs) | ||
| data["document_writer"] = self.document_writer.to_dict(**kwargs) | ||
| return data | ||
|
|
||
| def execute( | ||
| self, input_data: VectorStoreWriterInputSchema, config: RunnableConfig | None = None, **kwargs | ||
| ) -> dict[str, Any]: | ||
| """Execute the vector store writer tool. | ||
|
|
||
| Args: | ||
| input_data (VectorStoreWriterInputSchema): Input data for the tool. | ||
| config (RunnableConfig, optional): Configuration for the runnable, including callbacks. | ||
| **kwargs: Additional keyword arguments. | ||
|
|
||
| Returns: | ||
| dict[str, Any]: Result of the writing operation. | ||
| """ | ||
|
|
||
| logger.info(f"Tool {self.name} - {self.id}: started with INPUT DATA:\n{input_data.model_dump()}") | ||
| config = ensure_config(config) | ||
| self.reset_run_state() | ||
| self.run_on_node_execute_run(config.callbacks, **kwargs) | ||
|
|
||
| documents = input_data.documents | ||
tyaroshko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| try: | ||
| kwargs = kwargs | {"parent_run_id": kwargs.get("run_id")} | ||
| kwargs.pop("run_depends", None) | ||
|
|
||
| document_embedder_output = self.document_embedder.run( | ||
| input_data={"documents": documents}, run_depends=self._run_depends, config=config, **kwargs | ||
| ) | ||
| self._run_depends = [NodeDependency(node=self.document_embedder).to_dict(for_tracing=True)] | ||
tyaroshko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| embedded_documents = document_embedder_output.output.get("documents", []) | ||
| logger.debug(f"Tool {self.name} - {self.id}: embedded {len(embedded_documents)} documents") | ||
|
|
||
| document_writer_output = self.document_writer.run( | ||
| input_data={"documents": embedded_documents}, | ||
| run_depends=self._run_depends, | ||
| config=config, | ||
| **kwargs, | ||
| ) | ||
| self._run_depends = [NodeDependency(node=self.document_writer).to_dict(for_tracing=True)] | ||
| upserted_count = document_writer_output.output.get("upserted_count", 0) | ||
| logger.debug(f"Tool {self.name} - {self.id}: wrote {upserted_count} documents to vector store") | ||
|
|
||
| result = {"upserted_count": upserted_count} | ||
| logger.info(f"Tool {self.name} - {self.id}: finished with RESULT:\n{str(result)[:200]}...") | ||
|
|
||
| return result | ||
| except Exception as e: | ||
| logger.error(f"Tool {self.name} - {self.id}: execution error: {str(e)}", exc_info=True) | ||
| raise ToolExecutionException( | ||
| f"Tool '{self.name}' failed to write documents to the vector store. " | ||
| f"Error: {str(e)}. Please analyze the error and take appropriate action.", | ||
| recoverable=True, | ||
| ) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.