v1-Operativa — Presentación del Hito
Hito que cierra el ciclo: IMAP → procesamiento incremental → publicación trazable. Un solo comando, auditoría completa, reproducibilidad garantizada.
El sistema pasó de 5 scripts independientes a un orquestador unificado que ejecuta todos los pipelines de forma incremental, publica automáticamente y deja rastro de cada operación.
actualizar_pipelines.bat └─ run_pipeline_v2.py ├─ facturacion ─┐ ├─ pedidos_hes ─┤ en paralelo (ThreadPoolExecutor) ├─ pedidos_sap ─┤ ├─ gantt ─┤ └─ valorizaciones ─┘ │ ├─ SALIDAS/*.xlsx (Excel por pipeline) ├─ data_warehouse.db (SQLite con 5 tablas de auditoría) └─ Google Sheets (cruce_facturacion como post-paso único)Tiempo típico: ~3-5 min incremental · ~9-15 min full-rebuild.
Workflow Operativo
Sección titulada «Workflow Operativo»Comando principal
Sección titulada «Comando principal»actualizar_pipelines.batComportamiento por defecto: incremental (calcula ventana automáticamente desde el último run).
Modos disponibles
Sección titulada «Modos disponibles»| flag | comportamiento |
|---|---|
| (sin flags) | incremental (ventana automática) |
--dev | target dev, sin tocar prod |
--full-rebuild | desde __scope_initial_date__ |
--reprocess | offline desde data_raw/, sin IMAP |
--publish-status | diagnóstico de publicaciones recientes |
--scope-status | estado de scopes activos + next_window |
Archivos clave
Sección titulada «Archivos clave»run_pipeline_v2.py— orquestador Pythonactualizar_pipelines.bat— wrapper Windows (prod)actualizar_pipelines_dev.bat— wrapper dev (--dev --target dev)
Versionado en 3 Ejes
Sección titulada «Versionado en 3 Ejes»El sistema mantiene 3 versiones ortogonales: hacer bump en una no invalida las demás.
| eje | controla | bump cuando… |
|---|---|---|
scope_version | criterio de búsqueda IMAP | cambia “qué se captura” |
pipeline_version | lógica de transformación | cambia “cómo se procesa” |
code_version | hash git del commit activo | cualquier commit |
Fuente de verdad por eje
Sección titulada «Fuente de verdad por eje»scope_version→domains/<x>/scope.py::__scope_version__pipeline_version→domains/<x>/__init__.py::__version__code_version→git rev-parse --short HEAD(runtime)
Cálculo de la ventana incremental
Sección titulada «Cálculo de la ventana incremental»¿hay --FECHA_DESDE? → fecha_source = 'cli'¿hay --full-rebuild? → fecha_source = 'full_rebuild' (desde __scope_initial_date__)¿hay --reprocess? → fecha_source = 'reprocess' (lee data_raw/, sin IMAP)¿run previo bajo scope actual? SÍ → fecha_source = 'incremental' (last_run.raw_date_to - 3d overlap) NO → fecha_source = 'seed_initial' (desde __scope_initial_date__)Trazabilidad Runtime
Sección titulada «Trazabilidad Runtime»Cada fila procesada queda etiquetada con metadatos de auditoría inyectados pre-save_processed.
| campo | destino | desde |
|---|---|---|
fecha_ingreso | DataFrame + GSheets | PR #15 / #19 |
code_version | DataFrame + GSheets | PR #15 / #19 |
processed_by_version | SQLite (solo) | PR #15 |
fecha_source | pipeline_runs.parameters | PR #16 |
scope_version | pipeline_runs columna | PR #16 |
Deuda registrada:
processed_by_versionaún no llega al DataFrame de GSheets. Tarea pendiente post-v1.
Observabilidad — 5 Tablas SQLite
Sección titulada «Observabilidad — 5 Tablas SQLite»Todas en SALIDAS/db/ingeldata.db (base operacional — escrita por core/sqlite_store.py).
| tabla | granularidad | qué registra |
|---|---|---|
pipeline_runs | 1 fila/run | status, ventana, fecha_source, scope_version |
pipeline_stage_runs | N filas/run | 1 por stage: ingest/store/process/… |
publish_log | 1 fila/pub. | success/failed, error_msg, rows_published |
raw_files | 1 fila/archivo | archivos descargados + scope_version |
scope_rebuild_history | 1 fila/bump | old/new scope, reason, bumped_by |
Esquema completo
Sección titulada «Esquema completo»pipeline_runs id, pipeline_name, status, started_at, finished_at, raw_date_from, raw_date_to, fecha_source, scope_version, pipeline_version, code_version, parameters (JSON)
pipeline_stage_runs id, run_id (FK), stage_name, status, started_at, finished_at, rows_processed, error_msg
publish_log id, run_id (FK), target (gsheets|excel), status, published_at, error_msg, rows_published
raw_files id, pipeline_name, filename, downloaded_at, scope_version, email_uid
scope_rebuild_history id, pipeline_name, old_scope, new_scope, reason, bumped_at, bumped_byComandos de diagnóstico
Sección titulada «Comandos de diagnóstico»python run_pipeline_v2.py --publish-status # últimas N publicacionespython run_pipeline_v2.py --scope-status # scope_version + last_run + next_windowpython run_pipeline_v2.py --version-status # versiones activas por pipelineLogs por run en SALIDAS/logs/<timestamp>-<pipeline>.log.
Cookbook de Comandos
Sección titulada «Cookbook de Comandos»| caso de uso | comando |
|---|---|
| Cotidiano (prod) | actualizar_pipelines.bat |
| Test sin tocar prod | actualizar_pipelines.bat --dev |
| Re-descarga total | actualizar_pipelines.bat --full-rebuild |
| Diagnóstico publicaciones | python run_pipeline_v2.py --publish-status |
| Estado scopes | python run_pipeline_v2.py --scope-status |
| Reprocesar offline | python run_pipeline_v2.py <pipeline> --reprocess --target dev |
Cómo Recuperarse De…
Sección titulada «Cómo Recuperarse De…»Run fallido en stage publish
Sección titulada «Run fallido en stage publish»python run_pipeline_v2.py <pipeline> --reprocess --target dev# Verifica con --publish-status que el fallo quedó registradopython run_pipeline_v2.py --publish-statusScope desactualizado (captura emails incorrectos)
Sección titulada «Scope desactualizado (captura emails incorrectos)»- Bump
__scope_version__endomains/<x>/scope.py - Correr
--full-rebuildpara ese pipeline - El bump queda en
scope_rebuild_history
Datos en GSheets no coinciden con SQLite
Sección titulada «Datos en GSheets no coinciden con SQLite»Revisar publish_log para la fecha en cuestión. Si el publish falló silenciosamente, rerun con --reprocess.
code_version desconocido en una fila
Sección titulada «code_version desconocido en una fila»El hash corto está en el commit de git. git show <hash> devuelve el estado exacto del código que procesó esa fila.
Deuda Registrada y Roadmap
Sección titulada «Deuda Registrada y Roadmap»| item | prioridad | estado |
|---|---|---|
processed_by_version en GSheets | media | pendiente |
| Scheduler automático | alta | pendiente |
| Notificaciones fallo/éxito | media | pendiente |
| Dashboard observabilidad (portal) | baja | pendiente |
- IngelCoding — hub del proyecto
- Orquestador-Pipelines — detalle del orquestador
- Versionado-Pipelines-Plan — plan de versionado en 3 ejes
- Arquitectura-Completa — diagrama de capas completo
- DB-Structures-IngelCoding — esquema SQLite detallado
- Portal nodos:
v1_operativa·v1_workflow·v1_versionado·v1_trazabilidad·v1_observabilidad·v1_cookbook