Orquestador Pipelines - IngelCoding
Origen: GitHub Copilot (claude-sonnet-4.6) — actualizado 2026-05-18 Última actualización: 2026-05-18 (refleja registry V2 + waves paralelas + alertas_waypoint)
Ver también: ADR-Pipeline-V2-Framework (decisión del framework), Reference-BasePipeline (contrato técnico), Como-Crear-Pipeline-Nuevo (guía operativa), ADR-Zonal-Gestora-Derivada, ADR-Source-Connectors (decisión pendiente meta-orquestador).
Visión General
Sección titulada «Visión General»El orquestador run_pipeline.py gestiona la ejecución de todos los pipelines de datos de IngelCoding desde un punto de entrada centralizado.
Pipelines Registrados
Sección titulada «Pipelines Registrados»El orquestador mantiene tres registries internos según el modo de ejecución:
PIPELINES_V2 — pipelines in-process (BasePipeline)
Sección titulada «PIPELINES_V2 — pipelines in-process (BasePipeline)»| Pipeline | Domain | needs_mail | accepts_dates | Notas |
|---|---|---|---|---|
pedidos_SAP | domains.pedidos_sap | ✅ | ✅ | |
pedidos_HES | domains.pedidos_hes | ✅ | ✅ | publish_step: cruce_facturacion |
facturacion | domains.facturacion | ✅ | ✅ | publish_step: cruce_facturacion |
valorizaciones | domains.valorizaciones | ✅ | ✅ | |
gantt | domains.gantt | ✅ | ✅ | |
cierres_tecnicos | domains.cierres_tecnicos | ❌ | ❌ | sin IMAP (lee local) |
pagos_pendientes | domains.pagos_pendientes | ❌ | ❌ | sin IMAP |
alertas_waypoint | domains.alertas_waypoint | ✅ | ✅ | nuevo 2026-05 |
PIPELINES_LEGACY — scripts subprocess
Sección titulada «PIPELINES_LEGACY — scripts subprocess»| Pipeline | Script | Notas |
|---|---|---|
cruce_facturacion | scripts/reports/CRUCE_FACTURACION_VS_FACTURAS.py | Usado como publish_step de facturacion y pedidos_HES |
fetch_saldos | scripts/updates/fetch_missing_reports.py --mode saldos | Pre-step de pagos_full |
fetch_cierres | scripts/updates/fetch_missing_reports.py --mode control |
PIPELINES_COMPOUND — pre-step legacy + main V2
Sección titulada «PIPELINES_COMPOUND — pre-step legacy + main V2»| Pipeline | Composición |
|---|---|
pagos_full | fetch_saldos (legacy) + pagos_pendientes (V2) |
Pipelines / scripts NO registrados (deuda)
Sección titulada «Pipelines / scripts NO registrados (deuda)»| Item | Tipo | Estado |
|---|---|---|
costos | Pipeline parcial | domains/costos/main_costos.py existe sin clase V2 ni YAML |
data_warehouse | Consolidador analítico | scripts/db/data_warehouse.py standalone — debería ser epilogue |
generate_dim_ot | Generador dimensión | scripts/generate_dim_ot.py standalone |
seed_dim_proceso | Seeder dimensión | scripts/seed_dim_proceso.py standalone |
pbi_refresh | Refresh Power BI Service | scripts/pbi_refresh.py standalone |
Ver: discusión 2026-05-18 sobre meta-orquestador (ADR pendiente).
Parámetros Common
Sección titulada «Parámetros Common»Flags Globales
Sección titulada «Flags Globales»| Flag | Alias | Descripción |
|---|---|---|
--all | -a | Ejecutar todos los pipelines en secuencia |
--verbose | -v | Logging detallado |
--skip-revision | Para cierres_tecnicos: omitir paso 1 (generar revisión) | |
--dry-run | Para cierres_tecnicos: solo generar, no subir a GSheets |
--fecha-desde: Fecha inicio de extracción--fecha-hasta: Fecha fin de extracción
no_date_args
Sección titulada «no_date_args»Scripts marked con no_date_args=True no requieren fechas (ej. actualizaciones incrementales).
Estructura de Dominio
Sección titulada «Estructura de Dominio»Dominio cierres_tecnicos
Sección titulada «Dominio cierres_tecnicos»Pipeline de dos pasos:
- generate_revision: Genera revisión desde CONTROL OTs.py → parquet temporal
- write_cierres_to_sheets: Escribe a Google Sheets via ACTUALIZAR_CIERRES_TECNICOS.py
domains/cierres_tecnicos/├── __init__.py├── generate_revision.py # Wrapper de CONTROL OTs.py└── write_cierres_to_sheets.py # Wrapper de ACTUALIZAR_CIERRES_TECNICOS.pyFlags de main:
--zonal: Procesar por zona específica--skip-revision: Omitir paso 1 (generar revisión)--dry-run: Simulación
Integración de Scripts Legacy
Sección titulada «Integración de Scripts Legacy»Patrón Wrapper con importlib
Sección titulada «Patrón Wrapper con importlib»import importlib
# Cargar script legacy como módulolegacy_module = importlib.import_module("scripts.CONTROL_OTs")
# Llamar función del scriptresult = legacy_module.main(...)argparse SUPPRESS
Sección titulada «argparse SUPPRESS»Scripts de Capa 2 usan parser.add_argument('--param', help=argparse.SUPPRESS) para no aparecer en help del orquestador.
BUG/FIX Conocidos
Sección titulada «BUG/FIX Conocidos»main_pagos_pendientes.py - Fix de fecha
Sección titulada «main_pagos_pendientes.py - Fix de fecha»- Status: ✅ Corregido 2026-04-13
- Issue: Usaba
datetime.now()en B1 (fecha genérica) - Fix: Ahora inyecta la fecha del mail (del nombre del archivo) en B1
- Aplica a:
pagos_pendientes(solo lectura) ypagos_full(fetch + escribir)
Orden de Ejecución (—all) — modelo de waves paralelas
Sección titulada «Orden de Ejecución (—all) — modelo de waves paralelas»Desde 2026-04 el orquestador ejecuta los pipelines en waves: cada wave espera a que la anterior complete; dentro de la wave los pipelines corren en paralelo (ThreadPoolExecutor).
_PIPELINE_WAVES: list[list[str]] = [ # Wave 1: fetchers (pre-requisitos) ["fetch_saldos", "fetch_cierres"], # Wave 2: pipelines principales (independientes entre sí) ["pedidos_HES", "facturacion", "valorizaciones", "pedidos_SAP", "gantt", "alertas_waypoint", "cierres_tecnicos", "pagos_pendientes", "pagos_full"],]Post-wave: los publish_steps declarados por pipelines V2 (ej. cruce_facturacion de facturacion + pedidos_HES) se deduplican y ejecutan una sola vez al final cuando --publish está activo.
Total: 11 pipelines registrados (3 legacy + 8 V2 — pagos_full es compound).
Lo que falta en el orquestador (gap conocido)
Sección titulada «Lo que falta en el orquestador (gap conocido)»El orquestador termina después del post-wave de publish_steps. No incluye la consolidación analítica que alimenta Power BI:
Wave 1 → Wave 2 → publish_steps diferidos → [TERMINA] ↑ FALTA: epilogue de consolidación - seed_dim_proceso - generate_dim_ot - data_warehouse --populate - pbi_refreshEstos pasos hoy se corren manualmente con python scripts/db/data_warehouse.py --populate. Decisión arquitectónica pendiente — ver discusión 2026-05-18 sobre meta-orquestador.
Ejemplos de Comandos
Sección titulada «Ejemplos de Comandos»# Ejecutar todos los pipelinespython run_pipeline.py --all
# Ejecutar todos con logging detalladopython run_pipeline.py --all --verbose
# Ejecutar un pipeline específicopython run_pipeline.py facturacion --fecha-desde 2026-01-01 --fecha-hasta 2026-04-30
# Ejecutar cierres_tecnicos sin generar revisión (solo escribir)python run_pipeline.py cierres_tecnicos --skip-revision
# Dry-run para cierres_tecnicospython run_pipeline.py cierres_tecnicos --dry-run
# Ejecutar pagos_full (fetch + escribir)python run_pipeline.py pagos_fullEvolución
Sección titulada «Evolución»| Fecha | Cambio |
|---|---|
| 2026-05-18 | Agregado alertas_waypoint (V2); modelo waves documentado; sección deuda/epilogue agregada |
| 2026-04-13c | 10 pipelines (actualizar_facturas_drive y actualizar_facturacion_hes ahora son post_steps) |
| 2026-04-13b | 12 pipelines (+ fetch_saldos, fetch_cierres, pagos_full, flags —all/—skip-revision, fix fecha pagos) |
| 2026-04-13 | 9 pipelines (agregados cierres_tecnicos + 2× Capa 2) |
| 2026-04-12 | 6 pipelines iniciales |
Ver también
Sección titulada «Ver también»- Dashboard
- Sesión 2026-04-13b
- Decisiones Técnicas
Notas por Pipeline
Sección titulada «Notas por Pipeline»| Pipeline | Nota de Referencia |
|---|---|
pedidos_HES | Pipeline-Pedidos-HES (incluye post-step actualizar_facturacion_hes) |
facturacion | Pipeline-Facturacion (incluye post-step actualizar_facturas_drive) |
valorizaciones | Pipeline-Valorizaciones |
pedidos_SAP | Pipeline-Pedidos-SAP |
gantt | Pipeline-Gantt |
fetch_saldos | Pipeline-Fetch-Saldos |
fetch_cierres | Pipeline-Cierres-Tecnicos |
pagos_pendientes | Pipeline-Pagos-Pendientes |
pagos_full | Pipeline-Pagos-Full |
cierres_tecnicos | Pipeline-Cierres-Tecnicos |
alertas_waypoint | Pipeline-Alertas-Waypoint |
Documentación actualizada 2026-05-18