Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 39 additions & 21 deletions kubeflow-pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Two KFP pipelines are included:
- Two customizable pipelines to suit different needs:
- Standard PDF pipeline (backends, OCR engines, table structure, image export)
- VLM pipeline (Docling VLM or Granite-Vision pipeline options; remote VLM service supported)
- **Accelerator device selection**: Choose between CPU, CUDA GPU, Apple MPS, or auto-detection for optimal performance
- Multiple input sources: HTTP/S URLs or S3/S3-compatible APIs like MinIO
- Secret-based configuration:
- Remote VLM API configuration via a single mounted Kubernetes Secret
Expand All @@ -29,16 +30,23 @@ Two KFP pipelines are included:
```bash
kubeflow-pipelines
|
|- docling-standard
| |- docling_convert_components.py
| |- docling_convert_pipeline.py
| |- docling_convert_pipeline_compiled.yaml (generated)
|- common/
| |- __init__.py
| |- components.py (shared components)
| |- constants.py
|
|- docling-standard/
| |- standard_components.py
| |- standard_convert_pipeline.py
| |- standard_convert_pipeline_compiled.yaml (generated)
| |- local_run.py
| |- requirements.txt
|
|- docling-vlm
|- docling_convert_components.py
|- docling_convert_pipeline.py
|- docling_convert_pipeline_compiled.yaml (generated)
|- docling-vlm/
|- vlm_components.py
|- vlm_convert_pipeline.py
|- vlm_convert_pipeline_compiled.yaml (generated)
|- local_run.py
|- requirements.txt
```

Expand All @@ -56,22 +64,22 @@ Start by importing the compiled YAML file for the desired Docling pipeline (stan

**For the Standard Pipeline**:

Download the [compiled YAML file](docling-standard/docling_convert_pipeline_compiled.yaml?raw=1) and upload it on the _Import pipeline_ screen, or import it by URL by pointing it to `https://raw.githubusercontent.com/opendatahub-io/odh-data-processing/refs/heads/main/kubeflow-pipelines/docling-standard/docling_convert_pipeline_compiled.yaml`.
Download the [compiled YAML file](docling-standard/standard_convert_pipeline_compiled.yaml?raw=1) and upload it on the _Import pipeline_ screen, or import it by URL by pointing it to `https://raw.githubusercontent.com/opendatahub-io/odh-data-processing/refs/heads/main/kubeflow-pipelines/docling-standard/standard_convert_pipeline_compiled.yaml`.

**For the VLM Pipeline**:

Download the [compiled YAML file](docling-vlm/docling_convert_pipeline_compiled.yaml?raw=1) and upload it on the _Import pipeline_ screen, or import it by URL by pointing it to `https://raw.githubusercontent.com/opendatahub-io/odh-data-processing/refs/heads/main/kubeflow-pipelines/docling-vlm/docling_convert_pipeline_compiled.yaml`.
Download the [compiled YAML file](docling-vlm/vlm_convert_pipeline_compiled.yaml?raw=1) and upload it on the _Import pipeline_ screen, or import it by URL by pointing it to `https://raw.githubusercontent.com/opendatahub-io/odh-data-processing/refs/heads/main/kubeflow-pipelines/docling-vlm/vlm_convert_pipeline_compiled.yaml`.

Optionally, compile from source to generate the pipeline YAML yourself:

```bash
# Standard pipeline
cd odh-data-processing/kubeflow-pipelines/docling-standard
python docling_convert_pipeline.py
python standard_convert_pipeline.py

# VLM pipeline
cd odh-data-processing/kubeflow-pipelines/docling-vlm
python docling_convert_pipeline.py
python vlm_convert_pipeline.py
```

## 🖥️ Running
Expand All @@ -93,23 +101,32 @@ By default, both pipelines will consume documents stored in an HTTP/S source. To

#### 1) Defaults (Docling with default parameters)

- Standard pipeline defaults include `pdf_backend=dlparse_v4`, `image_export_mode=embedded`, `table_mode=accurate`, `num_threads=4`, `timeout_per_document=300`, `ocr=True`, `force_ocr=False`, `ocr_engine=tesseract`.
- VLM pipeline defaults include `num_threads=4`, `timeout_per_document=300`, `image_export_mode=embedded`, and `remote_model_enabled=False`.
- Standard pipeline defaults include `pdf_backend=dlparse_v4`, `image_export_mode=embedded`, `table_mode=accurate`, `num_threads=4`, `timeout_per_document=300`, `ocr=True`, `force_ocr=False`, `ocr_engine=tesseract`, `accelerator_device=auto`.
- VLM pipeline defaults include `num_threads=4`, `timeout_per_document=300`, `image_export_mode=embedded`, `remote_model_enabled=False`, and `accelerator_device=auto`.

#### 2) Accelerator device selection

Both pipelines support choosing the optimal accelerator device for your infrastructure:

- `docling_accelerator_device=auto` (default): Automatically selects the best available device
- `docling_accelerator_device=cpu`: Forces CPU-only processing for cost-effective execution
- `docling_accelerator_device=cuda`: Uses NVIDIA GPU acceleration for faster processing (requires GPU-enabled cluster)
- `docling_accelerator_device=mps`: Uses Apple Metal Performance Shaders (macOS environments)

#### 2) Minor tweaks: image and table modes
#### 3) Minor tweaks: image and table modes

- Standard pipeline parameters:
- `docling_image_export_mode`: `embedded` (default), `placeholder`, or `referenced`. In `embedded` mode, the image is embedded as base64 encoded string. With `placeholder`, only the position of the image is marked in the output. In `referenced` mode, the image is exported in PNG format and referenced from the main exported document.
- `docling_table_mode`: e.g., `accurate` (default), or `fast`. The mode to use in the table structure model.

#### 3) Forcing OCR
#### 4) Forcing OCR

- Standard pipeline:
- `docling_ocr=True` if enabled, the bitmap content will be processed using OCR.
- `docling_force_ocr=True` forces full-page OCR regardless of input.
- `docling_ocr_engine`: `tesseract` (default), `tesserocr`, or `rapidocr`. The OCR engine to use.

#### 4) Using a VLM remotely
#### 5) Using a VLM remotely

- VLM pipeline (`docling-vlm`): set `docling_remote_model_enabled=True` to route processing through a VLM [model service](https://github.com/rh-aiservices-bu/models-aas).
- Configuration for remote VLM models comes from a Kubernetes Secret mounted at `/mnt/secrets` instead of individual KFP parameters.
Expand All @@ -128,7 +145,7 @@ By default, both pipelines will consume documents stored in an HTTP/S source. To
--from-literal=REMOTE_MODEL_NAME="granite-vision-3-2"
```

#### 5) Consuming documents from S3
#### 6) Consuming documents from S3

If you'd like to consume documents stored in an S3-compatible object storage rather than in an URL:

Expand All @@ -153,7 +170,7 @@ If you'd like to consume documents stored in an S3-compatible object storage rat
--from-literal=S3_PREFIX="my-pdfs"
```

#### 6) Using enrichment models (Standard pipeline)
#### 7) Using enrichment models (Standard pipeline)

Toggle enrichments via boolean parameters:
- `docling_enrich_code`, `docling_enrich_formula`, `docling_enrich_picture_classes`, `docling_enrich_picture_description`.
Expand All @@ -162,6 +179,7 @@ Toggle enrichments via boolean parameters:

- Increase `num_splits` to **parallelize** across more workers (uses KFP `ParallelFor`).
- Tune `num_threads` and `timeout_per_document`.
- Adjust **container resources** per component, e.g. `set_memory_limit("6G")`, `set_cpu_limit("4")`, in [`docling-standard/docling_convert_pipeline.py`](docling-standard/docling_convert_pipeline.py) or [`docling-vlm/docling_convert_pipeline.py`](docling-vlm/docling_convert_pipeline.py).
- Change the value of the `base_image` component parameter ([example](https://github.com/opendatahub-io/odh-data-processing/blob/ceeb8be59910ea2986f33ee4c717ce31be66f1f5/kubeflow-pipelines/docling-standard/docling_convert_components.py#L176)) if you'd like to set a **custom container image** to be used in the pipeline run.
- **Choose accelerator device** based on your infrastructure: `auto`, `cpu`, `cuda`, or `mps`.
- Adjust **container resources** per component, e.g. `set_memory_limit("6G")`, `set_cpu_limit("4")`, in [`docling-standard/standard_convert_pipeline.py`](docling-standard/standard_convert_pipeline.py) or [`docling-vlm/vlm_convert_pipeline.py`](docling-vlm/vlm_convert_pipeline.py).
- Change the value of the `base_image` component parameter if you'd like to set a **custom container image** to be used in the pipeline run.
- Recompile the pipeline YAML after code or parameter interface changes to refresh the compiled YAML.
32 changes: 32 additions & 0 deletions kubeflow-pipelines/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Common components for Docling Kubeflow Pipelines.

This module contains shared components that are used across different
Docling pipeline implementations (standard and VLM).
"""

# Import all common components to make them easily accessible
from .components import (
import_pdfs,
create_pdf_splits,
download_docling_models,
)

from .constants import (
PYTHON_BASE_IMAGE,
DOCLING_BASE_IMAGE,
MODEL_TYPE_STANDARD,
MODEL_TYPE_VLM,
MODEL_TYPE_VLM_REMOTE,
)

__all__ = [
"import_pdfs",
"create_pdf_splits",
"download_docling_models",
"PYTHON_BASE_IMAGE",
"DOCLING_BASE_IMAGE",
"MODEL_TYPE_STANDARD",
"MODEL_TYPE_VLM",
"MODEL_TYPE_VLM_REMOTE",
]
212 changes: 212 additions & 0 deletions kubeflow-pipelines/common/components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from typing import List
from kfp import dsl
from .constants import PYTHON_BASE_IMAGE, DOCLING_BASE_IMAGE

@dsl.component(
base_image=PYTHON_BASE_IMAGE,
packages_to_install=["boto3", "requests"], # Required for S3 and HTTP downloads
)
def import_pdfs(
output_path: dsl.Output[dsl.Artifact],
filenames: str,
base_url: str,
from_s3: bool = False,
s3_secret_mount_path: str = "/mnt/secrets",
):
"""
Import PDF filenames (comma-separated) from specified URL or S3 bucket.

Args:
filenames: List of PDF filenames to import.
base_url: Base URL of the PDF files.
output_path: Path to the output directory for the PDF files.
from_s3: Whether or not to import from S3.
s3_secret_mount_path: Path to the secret mount path for the S3 credentials.
"""
import boto3 # pylint: disable=import-outside-toplevel
import os # pylint: disable=import-outside-toplevel
from pathlib import Path # pylint: disable=import-outside-toplevel
import requests # pylint: disable=import-outside-toplevel

filenames_list = [name.strip() for name in filenames.split(",") if name.strip()]
if not filenames_list:
raise ValueError("filenames must contain at least one filename (comma-separated)")

output_path_p = Path(output_path.path)
output_path_p.mkdir(parents=True, exist_ok=True)

if from_s3:
if not os.path.exists(s3_secret_mount_path):
raise ValueError(f"Secret for S3 should be mounted in {s3_secret_mount_path}")

s3_endpoint_url_secret = "S3_ENDPOINT_URL"
s3_endpoint_url_file_path = os.path.join(s3_secret_mount_path, s3_endpoint_url_secret)
if os.path.isfile(s3_endpoint_url_file_path):
with open(s3_endpoint_url_file_path) as f:
s3_endpoint_url = f.read()
else:
raise ValueError(f"Key {s3_endpoint_url_secret} not defined in secret {s3_secret_mount_path}")

s3_access_key_secret = "S3_ACCESS_KEY"
s3_access_key_file_path = os.path.join(s3_secret_mount_path, s3_access_key_secret)
if os.path.isfile(s3_access_key_file_path):
with open(s3_access_key_file_path) as f:
s3_access_key = f.read()
else:
raise ValueError(f"Key {s3_access_key_secret} not defined in secret {s3_secret_mount_path}")

s3_secret_key_secret = "S3_SECRET_KEY"
s3_secret_key_file_path = os.path.join(s3_secret_mount_path, s3_secret_key_secret)
if os.path.isfile(s3_secret_key_file_path):
with open(s3_secret_key_file_path) as f:
s3_secret_key = f.read()
else:
raise ValueError(f"Key {s3_secret_key_secret} not defined in secret {s3_secret_mount_path}")

s3_bucket_secret = "S3_BUCKET"
s3_bucket_file_path = os.path.join(s3_secret_mount_path, s3_bucket_secret)
if os.path.isfile(s3_bucket_file_path):
with open(s3_bucket_file_path) as f:
s3_bucket = f.read()
else:
raise ValueError(f"Key {s3_bucket_secret} not defined in secret {s3_secret_mount_path}")

s3_prefix_secret = "S3_PREFIX"
s3_prefix_file_path = os.path.join(s3_secret_mount_path, s3_prefix_secret)
if os.path.isfile(s3_prefix_file_path):
with open(s3_prefix_file_path) as f:
s3_prefix = f.read()
else:
raise ValueError(f"Key {s3_prefix_secret} not defined in secret {s3_secret_mount_path}")

if not s3_endpoint_url:
raise ValueError("S3_ENDPOINT_URL must be provided")

if not s3_bucket:
raise ValueError("S3_BUCKET must be provided")

s3_client = boto3.client(
's3',
endpoint_url=s3_endpoint_url,
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_key,
)

for filename in filenames_list:
orig = f"{s3_prefix.rstrip('/')}/{filename.lstrip('/')}"
dest = output_path_p / filename
print(f"import-test-pdfs: downloading {orig} -> {dest} from s3", flush=True)
s3_client.download_file(s3_bucket, orig, dest)
else:
if not base_url:
raise ValueError("base_url must be provided")

for filename in filenames_list:
url = f"{base_url.rstrip('/')}/{filename.lstrip('/')}"
dest = output_path_p / filename
print(f"import-test-pdfs: downloading {url} -> {dest}", flush=True)
with requests.get(url, stream=True, timeout=30) as resp:
resp.raise_for_status()
with dest.open("wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
Comment on lines +74 to +113
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle empty/whitespace S3 prefixes before building the object key

If S3_PREFIX is left blank (quite common when pulling from the bucket root), the current expression builds orig = "/<filename>", which does not match the actual key and the download fails. Newlines from the secret files also bleed into the key and credentials because we never strip them. Please trim the secret contents and only prepend the prefix when it’s non-empty.

-        if os.path.isfile(s3_prefix_file_path):
-            with open(s3_prefix_file_path) as f:
-                s3_prefix = f.read()
+        if os.path.isfile(s3_prefix_file_path):
+            with open(s3_prefix_file_path) as f:
+                s3_prefix = f.read().strip()
         else:
             raise ValueError(f"Key {s3_prefix_secret} not defined in secret {s3_secret_mount_path}")
 
-        if not s3_endpoint_url:
+        s3_endpoint_url = s3_endpoint_url.strip()
+        s3_access_key = s3_access_key.strip()
+        s3_secret_key = s3_secret_key.strip()
+        s3_bucket = s3_bucket.strip()
+
+        if not s3_endpoint_url:
             raise ValueError("S3_ENDPOINT_URL must be provided")
 
         if not s3_bucket:
             raise ValueError("S3_BUCKET must be provided")
 
         s3_client = boto3.client(
             's3',
             endpoint_url=s3_endpoint_url,
             aws_access_key_id=s3_access_key,
             aws_secret_access_key=s3_secret_key,
         )
 
         for filename in filenames_list:
-            orig = f"{s3_prefix.rstrip('/')}/{filename.lstrip('/')}"
+            key = filename.lstrip('/')
+            prefix = s3_prefix.rstrip('/')
+            orig = f"{prefix}/{key}" if prefix else key
             dest = output_path_p / filename
             print(f"import-test-pdfs: downloading {orig} -> {dest} from s3", flush=True)
             s3_client.download_file(s3_bucket, orig, dest)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
s3_prefix_secret = "S3_PREFIX"
s3_prefix_file_path = os.path.join(s3_secret_mount_path, s3_prefix_secret)
if os.path.isfile(s3_prefix_file_path):
with open(s3_prefix_file_path) as f:
s3_prefix = f.read()
else:
raise ValueError(f"Key {s3_prefix_secret} not defined in secret {s3_secret_mount_path}")
if not s3_endpoint_url:
raise ValueError("S3_ENDPOINT_URL must be provided")
if not s3_bucket:
raise ValueError("S3_BUCKET must be provided")
s3_client = boto3.client(
's3',
endpoint_url=s3_endpoint_url,
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_key,
)
for filename in filenames_list:
orig = f"{s3_prefix.rstrip('/')}/{filename.lstrip('/')}"
dest = output_path_p / filename
print(f"import-test-pdfs: downloading {orig} -> {dest} from s3", flush=True)
s3_client.download_file(s3_bucket, orig, dest)
else:
if not base_url:
raise ValueError("base_url must be provided")
for filename in filenames_list:
url = f"{base_url.rstrip('/')}/{filename.lstrip('/')}"
dest = output_path_p / filename
print(f"import-test-pdfs: downloading {url} -> {dest}", flush=True)
with requests.get(url, stream=True, timeout=30) as resp:
resp.raise_for_status()
with dest.open("wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
s3_prefix_secret = "S3_PREFIX"
s3_prefix_file_path = os.path.join(s3_secret_mount_path, s3_prefix_secret)
if os.path.isfile(s3_prefix_file_path):
with open(s3_prefix_file_path) as f:
s3_prefix = f.read().strip()
else:
raise ValueError(f"Key {s3_prefix_secret} not defined in secret {s3_secret_mount_path}")
s3_endpoint_url = s3_endpoint_url.strip()
s3_access_key = s3_access_key.strip()
s3_secret_key = s3_secret_key.strip()
s3_bucket = s3_bucket.strip()
if not s3_endpoint_url:
raise ValueError("S3_ENDPOINT_URL must be provided")
if not s3_bucket:
raise ValueError("S3_BUCKET must be provided")
s3_client = boto3.client(
's3',
endpoint_url=s3_endpoint_url,
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_key,
)
for filename in filenames_list:
key = filename.lstrip('/')
prefix = s3_prefix.rstrip('/')
orig = f"{prefix}/{key}" if prefix else key
dest = output_path_p / filename
print(f"import-test-pdfs: downloading {orig} -> {dest} from s3", flush=True)
s3_client.download_file(s3_bucket, orig, dest)
🧰 Tools
🪛 Ruff (0.13.2)

74-74: Possible hardcoded password assigned to: "s3_prefix_secret"

(S105)


80-80: Avoid specifying long messages outside the exception class

(TRY003)


83-83: Avoid specifying long messages outside the exception class

(TRY003)


86-86: Avoid specifying long messages outside the exception class

(TRY003)


102-102: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In kubeflow-pipelines/common/components.py around lines 74 to 113, the code
reads S3 secret values and builds object keys but doesn't strip
whitespace/newlines and unconditionally prepends a slash when S3_PREFIX is empty
which produces keys like "/filename". Trim whitespace from the secret contents
(s3_prefix, s3_access_key, s3_secret_key) using .strip(), and construct the S3
object key by only prepending the prefix when the stripped s3_prefix is
non-empty (e.g., join prefix and filename without producing a leading slash if
prefix is empty), leaving filename.lstrip('/') as-is so you never end up with a
leading '/' in the final key. Ensure any other secret-derived strings are
similarly stripped before use.


print("import-test-pdfs: done", flush=True)

@dsl.component(
base_image=PYTHON_BASE_IMAGE,
)
def create_pdf_splits(
input_path: dsl.Input[dsl.Artifact],
num_splits: int,
) -> List[List[str]]:
"""
Create a list of PDF splits.

Args:
input_path: Path to the input directory containing PDF files.
num_splits: Number of splits to create.
"""
from pathlib import Path # pylint: disable=import-outside-toplevel

input_path_p = Path(input_path.path)

all_pdfs = [path.name for path in input_path_p.glob("*.pdf")]
all_splits = [all_pdfs[i::num_splits] for i in range(num_splits)]
filled_splits = list(filter(None, all_splits))
return filled_splits

@dsl.component(
base_image=DOCLING_BASE_IMAGE,
)
def download_docling_models(
output_path: dsl.Output[dsl.Artifact],
pipeline_type: str = "standard",
remote_model_endpoint_enabled: bool = False,
):
"""
Download Docling models based on pipeline type and configuration.

This unified component handles model downloading for different pipeline types:
- standard : Download traditional Docling models (layout, tableformer, easyocr)
- vlm : Download Docling VLM models (smolvlm, smoldocling) for local inference
- vlm-remote : Download Docling VLM models for remote inference

Args:
output_path: Path to the output directory for Docling models
pipeline_type: Type of pipeline (standard, vlm)
remote_model_endpoint_enabled: Whether to download remote model endpoint models (VLM only)
"""
from pathlib import Path # pylint: disable=import-outside-toplevel
from docling.utils.model_downloader import download_models # pylint: disable=import-outside-toplevel

output_path_p = Path(output_path.path)
output_path_p.mkdir(parents=True, exist_ok=True)

if pipeline_type == "standard":
# Standard pipeline: download traditional models
download_models(
output_dir=output_path_p,
progress=True,
with_layout=True,
with_tableformer=True,
with_easyocr=False,
)
elif pipeline_type == "vlm" and remote_model_endpoint_enabled:
# VLM pipeline with remote model endpoint: Download minimal required models
# Only models set are what lives in fabianofranz repo
# TODO: figure out what needs to be downloaded or removed
download_models(
output_dir=output_path_p,
progress=False,
force=False,
with_layout=True,
with_tableformer=True,
with_code_formula=False,
with_picture_classifier=False,
with_smolvlm=False,
with_smoldocling=False,
with_smoldocling_mlx=False,
with_granite_vision=False,
with_easyocr=False,
)
elif pipeline_type == "vlm":
# VLM pipeline with local models: Download VLM models for local inference
# TODO: set models downloaded by model name passed into KFP pipeline ex: smoldocling OR granite-vision
download_models(
output_dir=output_path_p,
with_smolvlm=True,
with_smoldocling=True,
progress=False,
force=False,
with_layout=False,
with_tableformer=False,
with_code_formula=False,
with_picture_classifier=False,
with_smoldocling_mlx=False,
with_granite_vision=False,
with_easyocr=False,
)
else:
raise ValueError(f"Invalid pipeline_type: {pipeline_type}. Must be 'standard' or 'vlm'")
8 changes: 8 additions & 0 deletions kubeflow-pipelines/common/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Base container images used across all Docling Kubeflow Pipelines
PYTHON_BASE_IMAGE = "registry.access.redhat.com/ubi9/python-311:9.6-1755074620"
DOCLING_BASE_IMAGE = "quay.io/fabianofranz/docling-ubi9:2.54.0"

# Model types for download_docling_models component
MODEL_TYPE_STANDARD = "standard"
MODEL_TYPE_VLM = "vlm"
MODEL_TYPE_VLM_REMOTE = "vlm-remote"
Loading
Loading