🕐 O Campo generated_at
O generated_at é o GPS temporal de cada tabela-resumo. Sem ele, você não sabe se está olhando para dados de ontem ou de seis meses atrás — e o agente também não.
💡 A Dica Mais Importante deste Módulo
O generated_at é o seu GPS temporal. Sem ele, você não sabe se está olhando para dados de ontem ou de seis meses atrás. Quando o agente responde ao cliente "a receita do mês passado foi $45.000", ele precisa poder dizer: "esses dados foram calculados em 14/05/2026 às 08:30."
Um dado sem timestamp de geração é um dado sem contexto de validade.
Implementação Correta
from datetime import datetime, timezone
# Ao criar qualquer tabela-resumo:
generated_at = datetime.now(timezone.utc).isoformat()
conn.execute(f"""
CREATE OR REPLACE TABLE revenue_by_client_month AS
SELECT
customer_email,
DATE_TRUNC('month', payment_date) AS month,
SUM(amount_usd) AS total_revenue_usd,
COUNT(*) AS transaction_count,
'{generated_at}'::TIMESTAMP AS generated_at -- <-- OBRIGATÓRIO
FROM stripe_payments
WHERE status = 'succeeded'
GROUP BY customer_email, DATE_TRUNC('month', payment_date)
""")
# Também salvar metadados separadamente
conn.execute(f"""
INSERT INTO pipeline_runs (table_name, generated_at, row_count, status)
VALUES (
'revenue_by_client_month',
'{generated_at}',
(SELECT COUNT(*) FROM revenue_by_client_month),
'SUCCESS'
)
""")
Como o Agente Usa o generated_at
No system prompt do agente, sempre incluir:
# System prompt do agente
Antes de responder qualquer pergunta sobre dados, sempre verifique:
SELECT MAX(generated_at) FROM revenue_by_client_month
Inclua o frescor dos dados na resposta: "Dados atualizados em: {data}"
Se gerado_at for mais de 26 horas atrás, avise o usuário que os dados podem estar desatualizados.
📝 Versionamento de Resumos
Cada reprocessamento é uma nova versão. Sem versionamento, quando o cliente diz "os números mudaram desde ontem", você não tem como comparar a versão atual com a anterior.
Estrutura da Tabela de Metadados
CREATE TABLE IF NOT EXISTS pipeline_runs (
run_id VARCHAR DEFAULT gen_random_uuid(),
table_name VARCHAR NOT NULL,
generated_at TIMESTAMP NOT NULL,
row_count INTEGER,
status VARCHAR, -- 'SUCCESS', 'FAILED', 'PARTIAL'
trigger_reason VARCHAR, -- 'scheduled', 'manual', 'data_quality_fail'
notes TEXT,
PRIMARY KEY (run_id)
);
-- Comparar duas versões
SELECT
r1.generated_at AS versao_atual,
r2.generated_at AS versao_anterior,
r1.row_count - r2.row_count AS delta_linhas
FROM pipeline_runs r1
JOIN pipeline_runs r2 ON r1.table_name = r2.table_name
WHERE r1.table_name = 'revenue_by_client_month'
ORDER BY r1.generated_at DESC
LIMIT 2
✓ Boas práticas de versionamento
- ✓Manter histórico de pelo menos 30 versões
- ✓Registrar motivo de cada reprocessamento
- ✓Alertar se delta de linhas for > 10%
- ✓Manter snapshots de versões suspeitas para investigação
Rollback para versão anterior
Quando o novo reprocessamento gera resultados suspeitos:
- 1. Identificar a última versão válida via pipeline_runs
- 2. Restaurar o snapshot arquivado
- 3. Notificar o agente do rollback com motivo
- 4. Investigar a causa antes de reprocessar
🚨 Sinais de Dado Podre
Detectar antes que o agente detecte — ou pior, antes que o cliente detecte. Os 4 sinais clássicos cobrem 95% das falhas reais de pipelines em produção.
Sinal 1 — Volume Anormal
Registros hoje vs média histórica saindo do range. Falha de ingestão gera queda abrupta; duplicação de dados gera pico.
Threshold: < média - 2σ ou > média + 3σ
Sinal 2 — Schema Drift
Nova coluna aparece sem aviso, coluna existente some, tipo de dado muda. API do Stripe atualizou e adicionou um campo.
Detectar: hash do schema atual vs hash esperado
Sinal 3 — Aumento de Nulos
Percentual de nulos em colunas críticas subindo acima do baseline. Campo obrigatório ficou opcional na fonte.
Threshold: null_rate > baseline + 5pp
Sinal 4 — Valores Fora do Range
Receita negativa, datas futuras, idades impossíveis, percentuais acima de 100%. Erros de migração ou bug na fonte.
Range check: BETWEEN expected_min AND expected_max
Script de Verificação de Qualidade
def verificar_qualidade(conn, table_name):
checks = {}
# Volume
row_count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
checks['volume'] = {'valor': row_count, 'status': 'OK'}
# Nulos em colunas críticas
null_rate = conn.execute(f"""
SELECT AVG(CASE WHEN total_revenue_usd IS NULL THEN 1.0 ELSE 0.0 END)
FROM {table_name}
""").fetchone()[0]
checks['null_rate'] = {
'valor': f"{null_rate*100:.1f}%",
'status': 'FAIL' if null_rate > 0.05 else 'OK'
}
# Range de valores
neg_revenue = conn.execute(f"""
SELECT COUNT(*) FROM {table_name}
WHERE total_revenue_usd < 0
""").fetchone()[0]
checks['range'] = {
'valor': f"{neg_revenue} negativos",
'status': 'FAIL' if neg_revenue > 0 else 'OK'
}
return checks
📋 SLA de Dados
SLA (Service Level Agreement) de dados é o contrato interno que transforma "acho que tá funcionando" em "está dentro do contrato" ou "está fora do contrato".
Exemplo de SLA para o Projeto BrightPath
| Dimensão | Contrato | Ação se violado |
|---|---|---|
| Frequência | Atualização diária até 08h UTC | Alertar engenheiro |
| Frescor | Dados nunca mais velhos que 26h | Bloquear respostas do agente |
| Completude | > 98% em colunas críticas | Alertar e reprocessar |
| Volume mínimo | > 80% da média histórica | Reprocessar automaticamente |
🤖 Automando Reprocessamento com Claude Code
Monitoramento manual é insustentável. Um agente que detecta e reage automaticamente elimina o trabalho repetitivo e garante SLA mesmo durante fins de semana.
Script de verificação com saída estruturada
O script roda verificações e retorna JSON com status OK/FAIL + detalhes. Claude Code analisa o JSON e decide se reprocessar.
# verify_pipeline.py — saída JSON obrigatória
import json, sys
checks = verificar_qualidade(conn, 'revenue_by_client_month')
failed = [k for k, v in checks.items() if v['status'] == 'FAIL']
result = {'status': 'FAIL' if failed else 'OK', 'failed_checks': failed, 'details': checks}
print(json.dumps(result))
sys.exit(1 if failed else 0)
Prompt para Claude Code — Agente de Monitoramento
Você é o agente de monitoramento do pipeline de dados da BrightPath.
Execute: python verify_pipeline.py
Se status == 'FAIL': execute python reprocess_pipeline.py --force
Após reprocessar: execute verify_pipeline.py novamente
Se ainda FAIL: notifique o engenheiro com diagnóstico completo
Registre todas as ações em pipeline_runs com trigger_reason='auto'
Notificação — o que incluir
Severidade + causa + ação tomada + próximos passos. Nunca apenas "algo quebrou". Exemplo: "CRITICAL: null_rate em revenue subiu para 8.3% (SLA: <5%). Tentativa de reprocessamento às 03:47 falhou. Causa provável: API Stripe com timeout. Ação: bloqueei respostas do agente sobre receita. Aguardando intervenção manual."
📖 Documentando o Pipeline
Documentação bem feita permite que outro engenheiro (ou o próprio agente) mantenha o pipeline sem precisar de você. É a diferença entre um projeto sustentável e um projeto que só você consegue operar.
CLAUDE.md — O inventário do agente
- •Lista de todas as fontes de dados com localização
- •Anomalias conhecidas em cada fonte
- •Decisões de processamento documentadas
- •SLAs de cada tabela-resumo
- •Contato para escalada humana
Runbook de Emergência
- •Passo a passo para reprocessamento manual
- •Como fazer rollback para versão anterior
- •Como bloquear o agente de responder dados suspeitos
- •Critério de quando escalar para humano
Diagrama de Fluxo do Pipeline
QuickBooks, etc.
imutáveis
e unificados
+ DuckDB
responde
✅ Resumo do Módulo
Próximo Módulo:
2.6 — Painel de Monitoramento e Alertas: da teoria ao painel real com visibilidade completa