Skip to content

bug/get_job returns COMPLETED status on FAILED jobs #314

@kevinpilch

Description

@kevinpilch

Describe the bug
When implementing my own S3 to AWS RDS Postgres pipeline, I had some initial jobs fail due to wrong credentials. In the web UI, the job shows the status FAILED. When querying the job status via the unstructured_client.jobs.get_job() method, I get a JobStatus.COMPLETED status. When querying via unstructured_client.jobs.get_job_details(), I get a JobProcessingStatus.FAILED and can see the failure count in the destination node.

To Reproduce
Build and run some workflow with a Postgres destination:

def _build_s3_postgres_workflow(
        self, embeddings_create_request: EmbeddingsCreateRequest
    ) -> CreateWorkflow:
        source_response = self._configure_s3_unstructured_source(
            bucket_name=embeddings_create_request.s3_bucket,
            prefix=embeddings_create_request.s3_prefix,
        )

        destination_response = self._configure_postgres_data_destination()

        partition_node = WorkflowNode(
            name="Partitioner",
            subtype="vlm",
            type="partition",
            settings={
                "provider": "anthropic",
                "model": "claude-sonnet-4-5-20250929",
            },
        )

        chunk_node = WorkflowNode(
            name="Chunker",
            subtype="chunk_by_title",
            type="chunk",
            settings={
                "new_after_n_chars": 1000,
                "max_characters": 4096,
                "overlap": 150,
            },
        )

        embedder_node = WorkflowNode(
            name="Embedder",
            subtype="azure_openai",
            type="embed",
            settings={"model_name": "text-embedding-3-large"},
        )

        workflow = CreateWorkflow(
            name="S3 Knowledge Base to PostgreSQL Embedding Workflow",
            source_id=source_response.source_connector_information.id,
            destination_id=destination_response.destination_connector_information.id,
            workflow_type=WorkflowType.CUSTOM,
            workflow_nodes=[partition_node, chunk_node, embedder_node],
        )

        return workflow

    def _configure_s3_unstructured_source(
        self, bucket_name: str, prefix: str
    ) -> CreateSourceResponse:
        access_key_id, secret_access_key, session_token = (
            self._get_unstructured_aws_credentials()
        )

        remote_url = self._create_s3_uri(bucket_name, prefix)
        response = self.unstructured_client.sources.create_source(
            request=CreateSourceRequest(
                create_source_connector=CreateSourceConnector(
                    name="S3 Studio Knowledge Source",
                    type=SourceConnectorType.S3,
                    config={
                        "key": access_key_id,
                        "secret": secret_access_key,
                        "token": session_token,
                        "remote_url": remote_url,
                        "recursive": False,
                    },
                )
            )
        )
        return response

    def _configure_postgres_data_destination(self) -> CreateDestinationResponse:
        username, password = self._get_unstructured_postgres_credentials()
        return self.unstructured_client.destinations.create_destination(
            request=CreateDestinationRequest(
                create_destination_connector=CreateDestinationConnector(
                    name="Postgres Studio Knowledge Destination",
                    type=DestinationConnectorType.POSTGRES,
                    config={
                        "host": os.environ["POSTGRES_HOST"],
                        "database": os.environ["POSTGRES_DB"],
                        "port": os.environ["POSTGRES_PORT"],
                        "username": username,
                        "password": password,
                        "table_name": os.environ["POSTGRES_EMBEDDINGS_TABLE_NAME"],
                        "batch_size": 100,
                        "sslmode": "require",
                    },
                )
            )
        )

Misconfigure the destination password. The workflow should run and fail. Query the job status based on the job ID:

response = unstructured_client.jobs.get_job(
    request=GetJobRequest(job_id="91cf8c87-b00b-43d0-8c12-86ddf9b397b0")
)

job = response.job_information
status = job.status
print(status)

>>Output: JobStatus.COMPLETED

In the Web UI:

Image

Expected behavior
The unstructured_client should return a JobStatus.FAILED status and a non 200 status code.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions