CoreAI
CoreAI Doc AI Pipelines
CoreAI Doc AI Pipelines documentation
coreai-doc-ai-pipeline
Description
This Argo WorkflowTemplate automates a document ingestion & processing pipeline, starting from an object storage notification (MinIO) and ending with multiple steps of document analysis (text extraction, classification, and signature recognition), storing results into a ClickHouse database.
The workflow is triggered by external input (minio_data) representing an event/notification from MinIO when a new file is uploaded to a bucket.
Repository structure
The repository structure is the following and can be found on the platform team repository:
flowchart TB
A["."] --> A1["README.md"]
A --> B["doc_ai_pipelines"]
B --> B1["pipelines"]
B --> B2["utils"]
A --> C["helm"]
C --> C1["doc-ai-pipelines"]
C1 --> C1a["Chart.yaml"]
C1 --> C1b["templates"]
C1 --> C1c["values.yaml"]
%% … autres élémentsThe modifications for the workflow and deployment are done inside 'helm' but the modifications for the containers are done inside 'doc_ai_pipelines'.
Overall Workflow Flow
- Extract Metadata from MinIO Notification
- Parses the MinIO event to determine bucket name, file path, and file hash.
- Prepare the Database
- Checks if the ClickHouse table exists; creates it if not.
- Insert or Retrieve File Record
- Inserts a new ClickHouse record for the file if it doesn’t already exist.
- Retrieves the database record ID and the processing status of subsequent processing steps from the
metadataJSON field.
- Text Extraction (Conditional)
- Runs only if text extraction has not been completed and is enabled in the workflow configuration.
- Text Classification (Conditional)
- Runs only if classification has not been completed and is enabled in the workflow configuration.
- Signature Recognition (Conditional)
- Runs only if signature detection has not been completed and is enabled in the workflow configuration.
DAG Task Execution Order
flowchart TB
A["extract-minio-data"] --> A1["check-database"]
A1 --> B1["create-or-find-clickhouse-row"]
B1 --> C1["text-extraction"]
B1 --> C2["signature-recognition"]
C1 --> D1["text-classification"]Conditions
- Text extraction runs only if not previously completed and enabled in Helm values.
- Text classification runs only if previous extraction is complete & not already done and enabled in Helm values.
- Signature recognition runs in parallel after
create-or-find-clickhouse-rowif not already done and enabled in Helm values.
Task Details
1. extract-minio-data
- Purpose: Extracts bucket name, file path, and file hash from a MinIO notification (base64-encoded JSON).
- Outputs:
bucket_name,file_path,file_hash.
2. check-database
- Purpose: Ensures the ClickHouse table exists; creates it if missing.
- Critical: Outputs nothing, but failure aborts the workflow.
3. create-or-find-clickhouse-row
- Purpose: Inserts a file record if not present, retrieves record ID and previous processing step statuses from
metadata. - Outputs:
row_id,text-extraction-state,text-classification-state,signature-recognition-state.
4. text-extraction
- Purpose: Extracts textual content from the file and updates ClickHouse.
- Condition: Runs when text extraction has not succeeded before and is explicitly enabled.
5. text-classification
- Purpose: Classifies the extracted text (e.g., into predefined categories).
- Condition: Runs only if classification hasn’t succeeded before and is enabled.
6. signature-recognition
- Purpose: Detects presence and metadata of signatures in the document.
- Condition: Runs only if detection hasn’t succeeded before and is enabled.
2. DAG Tasks and Parameters Table
| Task Name | Template / TemplateRef | Depends On | Parameters (Name → Source) | Outputs | Conditional Execution |
|---|---|---|---|---|---|
extract-minio-data | extract-minio-data | None | minio_data → workflow.parameters.minio_data | bucket_name, file_path, file_hash | Always runs |
check-database | check-database | extract-minio-data | table_name → workflow.parameters.table_name | None | Always runs |
create-or-find-clickhouse-row | create-or-find-clickhouse-row | check-database | bucket_name → tasks.extract-minio-data.outputs.parameters.bucket_namefile_path → ...file_hash → ...table_name → workflow | row_id, text-extraction-state, text-classification-state, signature-recognition-state | Always runs |
text-extraction | TemplateRef: extract-text-template | create-or-find-clickhouse-row | table → workflow.parameters.table_namerow-id → from create-or-find-clickhouse-rowbucket-name & file-path from extract-minio-data | None | Runs if text-extraction-state != Succeeded AND .Values.textExtraction.enable == true |
text-classification | TemplateRef: classify-text-template | text-extraction | table → workflow parameterrow-id from create-or-find-clickhouse-row | None | Runs if text-classification-state != Succeeded AND .Values.textClassification.enable == true |
signature-recognition | TemplateRef: signature-detection-template | create-or-find-clickhouse-row | table → workflow parameterrow-id from create-or-find-clickhouse-rowbucket-name & file-path from extract-minio-data | None | Runs if signature-recognition-state != Succeeded AND .Values.signatureDetection.enable == true |
3. Key Implementation Insights
- Helm Templating: The workflow is parameterized using Helm
{{ .Values... }}syntax, making it deployable in multiple environments with different config values. - ClickHouse as Persistent Store: Acts as the source of truth for document processing statuses, enabling idempotent reprocessing if needed.
- MinIO Integration: Starts processing from MinIO event payloads.
- Conditional DAG Execution: Optimizes runtime by skipping already-completed steps.
- Security: Uses secrets for credentials and CA certificates for secure connections.
- Go Operator SDK controller: A separate controller write log the state of the tasks of the DAG in the Clickhouse DB it allows to not execute twice the same task for the same task.
Components
MinIO storage
The storage where the files are uploaded is the MinIO storage. A specific bucket must be created manually for containing the files. The bucket name must be the same in the Helm 'values' file.
When a file is uploaded to this bucket the argo event source and event sensor trigger a new workflow in argo workflows with the information for retrieving the file.
- Supported file formats are: 'application/pdf', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document','application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'image/jpeg', 'image/jpg', 'image/png', 'image/tiff', 'image/bmp', 'text/markdown', 'text/plain' , 'text/csv', 'text/asciidoc', 'application/xml', 'application/xml', 'application/json'
There is also a size limit that can be configured in the Helm values file.
Docai application
The docai application must run inside argocd and be deployed from the Helm chart with the proper values file. The application contains all the required component for interaction to MinIO, CLickhouse, and the workflows requirements.
When a file is uploaded in MinIO, the application trigger a workflow in argo workflows.
Clickhouse database
All the documents are extracted into the Clickhouse database.
The table for the documents in Clickhouse DB has the following structure:
| Column Name | Data Type | Description / Purpose |
|---|---|---|
id | UUID (default generateUUIDv4()) | Unique identifier for each document record, automatically generated if not provided. |
bucket | String | The MinIO (S3-compatible) bucket name where the file is stored. |
file_path | String | The file key/path within the bucket. |
file_hash | String | The file's hash (e.g., MD5/S3 ETag) for deduplication and integrity tracking. |
content | String | Extracted raw text content of the document. Initially empty until text extraction is done. |
file_type | String | File format/type (e.g., PDF, DOCX, TXT) — to be populated during processing. |
classes | String | Classification labels/tags for the document, typically JSON or comma-separated values from text classification. |
metadata | String | JSON string containing document metadata (e.g., page count, processing timestamps, per-step statuses). |
tables | String | Extraction results of tabular data from the document, stored as a serialized string/JSON. |
signature | UInt8 | Flag indicating whether a signature was detected (0 = no, 1 = yes). |
signature_metadata | String | Additional details about signatures found (location, type, etc.) in a serialized form. |
score | Float64 | Numeric score for the document — could represent confidence, relevance, or a custom metric. |
Controller
The controller aims to log the states of the tasks of the dag. The controller code can be found in the platform team repository. It is built with operator SDK in Go.
When a task change state in the workflow it is automatically logged inside the Clickhouse database in the metadata field of the document with the controller. The tasks that are monitored are given to the controller through a Helm value you can retrieve in the Helm values file, this way we can customize which tasks are monitored.
The logging of the tasks in the database allows to not execute a task that was already executed twice by controlling the state thad had been logged in the CLickhouse database from the workflow itself.