-
Notifications
You must be signed in to change notification settings - Fork 19
Description
Describe the bug
When implementing my own S3 to AWS RDS Postgres pipeline, I had some initial jobs fail due to wrong credentials. In the web UI, the job shows the status FAILED. When querying the job status via the unstructured_client.jobs.get_job() method, I get a JobStatus.COMPLETED status. When querying via unstructured_client.jobs.get_job_details(), I get a JobProcessingStatus.FAILED and can see the failure count in the destination node.
To Reproduce
Build and run some workflow with a Postgres destination:
def _build_s3_postgres_workflow(
self, embeddings_create_request: EmbeddingsCreateRequest
) -> CreateWorkflow:
source_response = self._configure_s3_unstructured_source(
bucket_name=embeddings_create_request.s3_bucket,
prefix=embeddings_create_request.s3_prefix,
)
destination_response = self._configure_postgres_data_destination()
partition_node = WorkflowNode(
name="Partitioner",
subtype="vlm",
type="partition",
settings={
"provider": "anthropic",
"model": "claude-sonnet-4-5-20250929",
},
)
chunk_node = WorkflowNode(
name="Chunker",
subtype="chunk_by_title",
type="chunk",
settings={
"new_after_n_chars": 1000,
"max_characters": 4096,
"overlap": 150,
},
)
embedder_node = WorkflowNode(
name="Embedder",
subtype="azure_openai",
type="embed",
settings={"model_name": "text-embedding-3-large"},
)
workflow = CreateWorkflow(
name="S3 Knowledge Base to PostgreSQL Embedding Workflow",
source_id=source_response.source_connector_information.id,
destination_id=destination_response.destination_connector_information.id,
workflow_type=WorkflowType.CUSTOM,
workflow_nodes=[partition_node, chunk_node, embedder_node],
)
return workflow
def _configure_s3_unstructured_source(
self, bucket_name: str, prefix: str
) -> CreateSourceResponse:
access_key_id, secret_access_key, session_token = (
self._get_unstructured_aws_credentials()
)
remote_url = self._create_s3_uri(bucket_name, prefix)
response = self.unstructured_client.sources.create_source(
request=CreateSourceRequest(
create_source_connector=CreateSourceConnector(
name="S3 Studio Knowledge Source",
type=SourceConnectorType.S3,
config={
"key": access_key_id,
"secret": secret_access_key,
"token": session_token,
"remote_url": remote_url,
"recursive": False,
},
)
)
)
return response
def _configure_postgres_data_destination(self) -> CreateDestinationResponse:
username, password = self._get_unstructured_postgres_credentials()
return self.unstructured_client.destinations.create_destination(
request=CreateDestinationRequest(
create_destination_connector=CreateDestinationConnector(
name="Postgres Studio Knowledge Destination",
type=DestinationConnectorType.POSTGRES,
config={
"host": os.environ["POSTGRES_HOST"],
"database": os.environ["POSTGRES_DB"],
"port": os.environ["POSTGRES_PORT"],
"username": username,
"password": password,
"table_name": os.environ["POSTGRES_EMBEDDINGS_TABLE_NAME"],
"batch_size": 100,
"sslmode": "require",
},
)
)
)
Misconfigure the destination password. The workflow should run and fail. Query the job status based on the job ID:
response = unstructured_client.jobs.get_job(
request=GetJobRequest(job_id="91cf8c87-b00b-43d0-8c12-86ddf9b397b0")
)
job = response.job_information
status = job.status
print(status)
>>Output: JobStatus.COMPLETED
In the Web UI:
Expected behavior
The unstructured_client should return a JobStatus.FAILED status and a non 200 status code.